1use arrow::array::{
2 ArrayRef, BooleanArray, PrimitiveArray,
3 cast::AsArray,
4 types::{
5 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
6 TimestampSecondType,
7 },
8};
9use arrow::buffer::{BooleanBuffer, ScalarBuffer};
10use arrow::datatypes::{ArrowPrimitiveType, Date32Type, Int32Type, UInt32Type};
11use arrow_schema::{DataType, TimeUnit};
12use bytes::Bytes;
13use num_traits::AsPrimitive;
14use std::ops::Range;
15use std::sync::Arc;
16
17use super::LiquidArray;
18use super::primitive_array::LiquidPrimitiveArray;
19use super::{LiquidDataType, LiquidSqueezedArray};
20use crate::liquid_array::LiquidPrimitiveType;
21use crate::liquid_array::SqueezeIoHandler;
22use crate::liquid_array::raw::BitPackedArray;
23use crate::utils::get_bit_width;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize)]
27pub enum Date32Field {
28 Year,
30 Month,
32 Day,
34 DayOfWeek,
36}
37
38#[derive(Debug, Clone)]
44pub struct SqueezedDate32Array {
45 field: Date32Field,
46 bit_packed: BitPackedArray<UInt32Type>,
47 reference_value: i32,
49 original_data_type: DataType,
50 backing: Option<SqueezedBacking>,
51}
52
53#[derive(Debug, Clone)]
54struct SqueezedBacking {
55 io: Arc<dyn SqueezeIoHandler>,
56 disk_range: Range<u64>,
57}
58
59impl SqueezedDate32Array {
60 pub fn from_liquid_date32<T: LiquidPrimitiveType>(
62 array: &LiquidPrimitiveArray<T>,
63 field: Date32Field,
64 ) -> Self {
65 let arrow_array: PrimitiveArray<Date32Type> =
67 array.to_arrow_array().as_primitive::<Date32Type>().clone();
68
69 let (_dt, values, nulls) = arrow_array.into_parts();
70
71 let mut has_value = false;
73 let mut min_component: i32 = i32::MAX;
74 let mut max_component: i32 = i32::MIN;
75
76 if let Some(nulls_buf) = &nulls
78 && nulls_buf.null_count() == values.len()
79 {
80 return Self {
81 field,
82 bit_packed: BitPackedArray::new_null_array(values.len()),
83 reference_value: 0,
84 original_data_type: DataType::Date32,
85 backing: None,
86 };
87 }
88
89 for (idx, &days) in values.iter().enumerate() {
90 if let Some(nulls_buf) = &nulls
91 && nulls_buf.is_null(idx)
92 {
93 continue;
94 }
95 let comp = component_from_days(field, days);
96 has_value = true;
97 if comp < min_component {
98 min_component = comp;
99 }
100 if comp > max_component {
101 max_component = comp;
102 }
103 }
104
105 if !has_value {
107 return Self {
108 field,
109 bit_packed: BitPackedArray::new_null_array(values.len()),
110 reference_value: 0,
111 original_data_type: DataType::Date32,
112 backing: None,
113 };
114 }
115
116 let max_offset = (max_component as i64 - min_component as i64) as u64;
118 let bit_width = get_bit_width(max_offset);
119
120 let offsets: ScalarBuffer<<UInt32Type as ArrowPrimitiveType>::Native> =
122 ScalarBuffer::from_iter((0..values.len()).map(|idx| {
123 if nulls.as_ref().is_some_and(|n| n.is_null(idx)) {
124 0u32
125 } else {
126 let comp = component_from_days(field, values[idx]);
127 (comp - min_component) as u32
128 }
129 }));
130
131 let unsigned_array = PrimitiveArray::<UInt32Type>::new(offsets, nulls);
132 let bit_packed = BitPackedArray::from_primitive(unsigned_array, bit_width);
133
134 Self {
135 field,
136 bit_packed,
137 reference_value: min_component,
138 original_data_type: DataType::Date32,
139 backing: None,
140 }
141 }
142
143 pub fn from_liquid_timestamp<T: LiquidPrimitiveType>(
145 array: &LiquidPrimitiveArray<T>,
146 field: Date32Field,
147 ) -> Self {
148 let unit = timestamp_unit(&T::DATA_TYPE).expect("timestamp data type");
149 let arrow_array: PrimitiveArray<T> = array.to_arrow_array().as_primitive::<T>().clone();
150 let (_dt, values, nulls) = arrow_array.into_parts();
151
152 let mut has_value = false;
153 let mut min_component: i32 = i32::MAX;
154 let mut max_component: i32 = i32::MIN;
155
156 if let Some(nulls_buf) = &nulls
157 && nulls_buf.null_count() == values.len()
158 {
159 return Self {
160 field,
161 bit_packed: BitPackedArray::new_null_array(values.len()),
162 reference_value: 0,
163 original_data_type: T::DATA_TYPE.clone(),
164 backing: None,
165 };
166 }
167
168 for (idx, &value) in values.iter().enumerate() {
169 if let Some(nulls_buf) = &nulls
170 && nulls_buf.is_null(idx)
171 {
172 continue;
173 }
174 let days = timestamp_to_days_since_epoch(value.as_(), unit);
175 let comp = component_from_days(field, days);
176 has_value = true;
177 if comp < min_component {
178 min_component = comp;
179 }
180 if comp > max_component {
181 max_component = comp;
182 }
183 }
184
185 if !has_value {
186 return Self {
187 field,
188 bit_packed: BitPackedArray::new_null_array(values.len()),
189 reference_value: 0,
190 original_data_type: T::DATA_TYPE.clone(),
191 backing: None,
192 };
193 }
194
195 let max_offset = (max_component as i64 - min_component as i64) as u64;
196 let bit_width = get_bit_width(max_offset);
197
198 let offsets: ScalarBuffer<<UInt32Type as ArrowPrimitiveType>::Native> =
199 ScalarBuffer::from_iter((0..values.len()).map(|idx| {
200 if nulls.as_ref().is_some_and(|n| n.is_null(idx)) {
201 0u32
202 } else {
203 let days = timestamp_to_days_since_epoch(values[idx].as_(), unit);
204 let comp = component_from_days(field, days);
205 (comp - min_component) as u32
206 }
207 }));
208
209 let unsigned_array = PrimitiveArray::<UInt32Type>::new(offsets, nulls);
210 let bit_packed = BitPackedArray::from_primitive(unsigned_array, bit_width);
211
212 Self {
213 field,
214 bit_packed,
215 reference_value: min_component,
216 original_data_type: T::DATA_TYPE.clone(),
217 backing: None,
218 }
219 }
220
221 pub(crate) fn with_backing(
222 mut self,
223 io: Arc<dyn SqueezeIoHandler>,
224 disk_range: Range<u64>,
225 ) -> Self {
226 self.backing = Some(SqueezedBacking { io, disk_range });
227 self
228 }
229
230 async fn read_backing(&self) -> Bytes {
231 let backing = self
232 .backing
233 .as_ref()
234 .expect("SqueezedDate32Array backing not set");
235 backing
236 .io
237 .read(Some(backing.disk_range.clone()))
238 .await
239 .expect("read squeezed backing")
240 }
241
242 pub fn len(&self) -> usize {
244 self.bit_packed.len()
245 }
246
247 pub fn is_empty(&self) -> bool {
249 self.len() == 0
250 }
251
252 pub fn get_array_memory_size(&self) -> usize {
254 self.bit_packed.get_array_memory_size() + std::mem::size_of::<i32>()
255 }
256
257 pub fn field(&self) -> Date32Field {
259 self.field
260 }
261
262 pub fn to_component_array(&self) -> ArrayRef {
264 match &self.original_data_type {
265 DataType::Date32 => Arc::new(self.to_component_date32()) as ArrayRef,
266 DataType::Timestamp(unit, _) => self.to_component_timestamp(*unit),
267 _ => Arc::new(self.to_component_date32()) as ArrayRef,
268 }
269 }
270
271 pub fn to_component_date32(&self) -> PrimitiveArray<Date32Type> {
274 let unsigned: PrimitiveArray<UInt32Type> = self.bit_packed.to_primitive();
275 let (_dt, values, nulls) = unsigned.into_parts();
276 let ref_v = self.reference_value;
277 let signed_values: ScalarBuffer<<Int32Type as ArrowPrimitiveType>::Native> =
278 ScalarBuffer::from_iter(values.iter().map(|&v| (v as i32).saturating_add(ref_v)));
279 PrimitiveArray::<Date32Type>::new(signed_values, nulls)
280 }
281
282 fn to_component_timestamp(&self, unit: TimeUnit) -> ArrayRef {
283 let unsigned: PrimitiveArray<UInt32Type> = self.bit_packed.to_primitive();
284 let (_dt, values, nulls) = unsigned.into_parts();
285 let ref_v = self.reference_value;
286 let signed_values: ScalarBuffer<i64> =
287 ScalarBuffer::from_iter(values.iter().map(|&v| (v as i32 + ref_v) as i64));
288
289 match unit {
290 TimeUnit::Second => Arc::new(PrimitiveArray::<TimestampSecondType>::new(
291 signed_values,
292 nulls,
293 )),
294 TimeUnit::Millisecond => Arc::new(PrimitiveArray::<TimestampMillisecondType>::new(
295 signed_values.clone(),
296 nulls,
297 )),
298 TimeUnit::Microsecond => Arc::new(PrimitiveArray::<TimestampMicrosecondType>::new(
299 signed_values.clone(),
300 nulls,
301 )),
302 TimeUnit::Nanosecond => Arc::new(PrimitiveArray::<TimestampNanosecondType>::new(
303 signed_values,
304 nulls,
305 )),
306 }
307 }
308
309 pub fn to_arrow_date32_lossy(&self) -> PrimitiveArray<Date32Type> {
317 let unsigned: PrimitiveArray<UInt32Type> = self.bit_packed.to_primitive();
318 let (_dt, values, nulls) = unsigned.into_parts();
319
320 let ref_v = self.reference_value;
321 let days_values: ScalarBuffer<<Date32Type as ArrowPrimitiveType>::Native> =
322 ScalarBuffer::from_iter(values.iter().enumerate().map(|(i, &off)| {
323 if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
324 0i32
325 } else {
326 match self.field {
327 Date32Field::Year => {
328 let y = ref_v + off as i32;
329 ymd_to_epoch_days(y, 1, 1)
330 }
331 Date32Field::Month => {
332 let m = (ref_v + off as i32) as u32;
333 ymd_to_epoch_days(1970, m, 1)
334 }
335 Date32Field::Day => {
336 let d = (ref_v + off as i32) as u32;
337 ymd_to_epoch_days(1970, 1, d)
338 }
339 Date32Field::DayOfWeek => {
340 let dow = ref_v + off as i32;
341 ymd_to_epoch_days(1970, 1, 4).saturating_add(dow)
342 }
343 }
344 }
345 }));
346
347 PrimitiveArray::<Date32Type>::new(days_values, nulls)
348 }
349}
350
351fn ymd_from_epoch_days(days_since_epoch: i32) -> (i32, u32, u32) {
354 let z = days_since_epoch as i64 + 719_468; let era = if z >= 0 {
357 z / 146_097
358 } else {
359 (z - 146_096) / 146_097
360 };
361 let doe = z - era * 146_097; let yoe = (doe - doe / 1_460 + doe / 36_524 - doe / 146_096) / 365; let mut y = yoe + era * 400;
364 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); let mp = (5 * doy + 2) / 153; let d = (doy - (153 * mp + 2) / 5) + 1; let m = mp + if mp < 10 { 3 } else { -9 }; if m <= 2 {
369 y += 1;
370 }
371 (y as i32, m as u32, d as u32)
372}
373
374fn component_from_days(field: Date32Field, days: i32) -> i32 {
375 let (year, month, day) = ymd_from_epoch_days(days);
376 match field {
377 Date32Field::Year => year,
378 Date32Field::Month => month as i32,
379 Date32Field::Day => day as i32,
380 Date32Field::DayOfWeek => day_of_week_sunday0(days),
381 }
382}
383
384fn day_of_week_sunday0(days_since_epoch: i32) -> i32 {
385 (days_since_epoch + 4).rem_euclid(7)
386}
387
388fn timestamp_unit(data_type: &DataType) -> Option<TimeUnit> {
389 match data_type {
390 DataType::Timestamp(unit, _) => Some(*unit),
391 _ => None,
392 }
393}
394
395fn timestamp_to_days_since_epoch(value: i64, unit: TimeUnit) -> i32 {
396 let ticks_per_day = match unit {
397 TimeUnit::Second => 86_400,
398 TimeUnit::Millisecond => 86_400_000,
399 TimeUnit::Microsecond => 86_400_000_000,
400 TimeUnit::Nanosecond => 86_400_000_000_000,
401 };
402 (value.div_euclid(ticks_per_day)) as i32
403}
404
405fn ymd_to_epoch_days(year: i32, month: u32, day: u32) -> i32 {
408 let y = year as i64 - if month <= 2 { 1 } else { 0 };
410 let era = if y >= 0 { y / 400 } else { (y - 399) / 400 };
411 let yoe = y - era * 400; let m = month as i64;
413 let d = day as i64;
414 let mp = m + if m > 2 { -3 } else { 9 }; let doy = (153 * mp + 2) / 5 + d - 1; let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; (era * 146_097 + doe - 719_468) as i32
418}
419
420#[async_trait::async_trait]
421impl LiquidSqueezedArray for SqueezedDate32Array {
422 fn as_any(&self) -> &dyn std::any::Any {
423 self
424 }
425
426 fn get_array_memory_size(&self) -> usize {
427 self.get_array_memory_size()
428 }
429
430 fn len(&self) -> usize {
431 self.len()
432 }
433
434 async fn to_arrow_array(&self) -> ArrayRef {
435 let bytes = self.read_backing().await;
436 let liquid = crate::liquid_array::ipc::read_from_bytes(
437 bytes,
438 &crate::liquid_array::ipc::LiquidIPCContext::new(None),
439 );
440 liquid.to_arrow_array()
441 }
442
443 fn data_type(&self) -> LiquidDataType {
444 LiquidDataType::Integer
445 }
446
447 fn original_arrow_data_type(&self) -> DataType {
448 self.original_data_type.clone()
449 }
450
451 async fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
452 if selection.count_set_bits() == 0 {
453 return arrow::array::new_empty_array(&self.original_arrow_data_type());
454 }
455 let full = self.to_arrow_array().await;
456 let selection_array = BooleanArray::new(selection.clone(), None);
457 arrow::compute::filter(&full, &selection_array).unwrap()
458 }
459
460 async fn try_eval_predicate(
461 &self,
462 _predicate: &Arc<dyn datafusion::physical_plan::PhysicalExpr>,
463 _filter: &BooleanBuffer,
464 ) -> Option<BooleanArray> {
465 None
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 use super::*;
472 use arrow::array::types::TimestampMicrosecondType;
473 use arrow::array::{Array, PrimitiveArray};
474 use std::sync::Arc;
475
476 fn dates(vals: &[Option<i32>]) -> PrimitiveArray<Date32Type> {
477 PrimitiveArray::<Date32Type>::from(vals.to_vec())
478 }
479
480 fn assert_prim_eq<T: ArrowPrimitiveType>(a: PrimitiveArray<T>, b: PrimitiveArray<T>) {
481 let a_ref: arrow::array::ArrayRef = Arc::new(a);
482 let b_ref: arrow::array::ArrayRef = Arc::new(b);
483 assert_eq!(a_ref.as_ref(), b_ref.as_ref());
484 }
485
486 fn extract(field: Date32Field, input: Vec<Option<i32>>) -> PrimitiveArray<Date32Type> {
487 let arr = dates(&input);
488 let liquid = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(arr);
489 let squeezed = SqueezedDate32Array::from_liquid_date32(&liquid, field);
490 squeezed.to_component_date32()
491 }
492
493 fn lossy(field: Date32Field, input: Vec<Option<i32>>) -> PrimitiveArray<Date32Type> {
494 let arr = dates(&input);
495 let liquid = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(arr);
496 let squeezed = SqueezedDate32Array::from_liquid_date32(&liquid, field);
497 squeezed.to_arrow_date32_lossy()
498 }
499
500 #[test]
501 fn test_extraction_correctness() {
502 let input = vec![
504 Some(-1),
505 Some(0),
506 Some(ymd_to_epoch_days(1971, 7, 15)),
507 None,
508 ];
509 let expected =
510 PrimitiveArray::<Date32Type>::from(vec![Some(1969), Some(1970), Some(1971), None]);
511 assert_prim_eq(extract(Date32Field::Year, input), expected);
512
513 let input = vec![
515 Some(ymd_to_epoch_days(1970, 1, 31)),
516 Some(ymd_to_epoch_days(1970, 2, 1)),
517 Some(ymd_to_epoch_days(1970, 12, 31)),
518 None,
519 ];
520 let expected = PrimitiveArray::<Date32Type>::from(vec![Some(1), Some(2), Some(12), None]);
521 assert_prim_eq(extract(Date32Field::Month, input), expected);
522
523 let input = vec![
525 Some(ymd_to_epoch_days(1970, 1, 1)),
526 Some(ymd_to_epoch_days(1970, 1, 31)),
527 Some(ymd_to_epoch_days(1970, 2, 1)),
528 None,
529 ];
530 let expected = PrimitiveArray::<Date32Type>::from(vec![Some(1), Some(31), Some(1), None]);
531 assert_prim_eq(extract(Date32Field::Day, input), expected);
532
533 let input = vec![
535 Some(ymd_to_epoch_days(1970, 1, 4)),
536 Some(ymd_to_epoch_days(1970, 1, 5)),
537 Some(ymd_to_epoch_days(1970, 1, 10)),
538 None,
539 ];
540 let expected = PrimitiveArray::<Date32Type>::from(vec![Some(0), Some(1), Some(6), None]);
541 assert_prim_eq(extract(Date32Field::DayOfWeek, input), expected);
542 }
543
544 #[test]
545 fn test_lossy_reconstruction_mapping() {
546 let input = vec![
548 Some(ymd_to_epoch_days(1999, 12, 31)),
549 Some(ymd_to_epoch_days(2000, 6, 1)),
550 None,
551 ];
552 let expected = PrimitiveArray::<Date32Type>::from(vec![
553 Some(ymd_to_epoch_days(1999, 1, 1)),
554 Some(ymd_to_epoch_days(2000, 1, 1)),
555 None,
556 ]);
557 assert_prim_eq(lossy(Date32Field::Year, input), expected);
558
559 let input = vec![
561 Some(ymd_to_epoch_days(1980, 3, 14)),
562 Some(ymd_to_epoch_days(1977, 12, 5)),
563 None,
564 ];
565 let expected = PrimitiveArray::<Date32Type>::from(vec![
566 Some(ymd_to_epoch_days(1970, 3, 1)),
567 Some(ymd_to_epoch_days(1970, 12, 1)),
568 None,
569 ]);
570 assert_prim_eq(lossy(Date32Field::Month, input), expected);
571
572 let input = vec![
574 Some(ymd_to_epoch_days(1980, 3, 14)),
575 Some(ymd_to_epoch_days(1977, 12, 5)),
576 None,
577 ];
578 let expected = PrimitiveArray::<Date32Type>::from(vec![
579 Some(ymd_to_epoch_days(1970, 1, 14)),
580 Some(ymd_to_epoch_days(1970, 1, 5)),
581 None,
582 ]);
583 assert_prim_eq(lossy(Date32Field::Day, input), expected);
584
585 let input = vec![
587 Some(ymd_to_epoch_days(2020, 5, 17)),
588 Some(ymd_to_epoch_days(2020, 5, 18)),
589 None,
590 ];
591 let expected = PrimitiveArray::<Date32Type>::from(vec![
592 Some(ymd_to_epoch_days(1970, 1, 4)),
593 Some(ymd_to_epoch_days(1970, 1, 5)),
594 None,
595 ]);
596 assert_prim_eq(lossy(Date32Field::DayOfWeek, input), expected);
597 }
598
599 #[test]
600 fn test_roundtrip_idempotence() {
601 let input = vec![
602 Some(ymd_to_epoch_days(1969, 12, 31)),
603 Some(ymd_to_epoch_days(1970, 1, 1)),
604 Some(ymd_to_epoch_days(1970, 1, 31)),
605 Some(ymd_to_epoch_days(1970, 2, 1)),
606 Some(ymd_to_epoch_days(1971, 7, 15)),
607 None,
608 ];
609
610 for &field in &[
611 Date32Field::Year,
612 Date32Field::Month,
613 Date32Field::Day,
614 Date32Field::DayOfWeek,
615 ] {
616 let comp1 = extract(field, input.clone());
617 let lossy_dt = lossy(field, input.clone());
618 let liquid2 = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(lossy_dt);
619 let comp2 =
620 SqueezedDate32Array::from_liquid_date32(&liquid2, field).to_component_date32();
621 assert_prim_eq(comp1, comp2);
622 }
623 }
624
625 #[test]
626 fn test_all_nulls_behavior() {
627 let input = vec![None, None, None];
628
629 for &field in &[
630 Date32Field::Year,
631 Date32Field::Month,
632 Date32Field::Day,
633 Date32Field::DayOfWeek,
634 ] {
635 let comp = extract(field, input.clone());
636 let expected_comp = PrimitiveArray::<Date32Type>::from(vec![None, None, None]);
637 assert_prim_eq(comp, expected_comp);
638
639 let lossy_dt = lossy(field, input.clone());
640 let expected_dt = PrimitiveArray::<Date32Type>::from(vec![None, None, None]);
641 assert_prim_eq(lossy_dt, expected_dt);
642 }
643 }
644
645 #[test]
646 fn test_timestamp_extraction() {
647 let input = vec![
648 Some(1_609_459_200_000_000),
649 Some(1_640_995_200_000_000),
650 None,
651 ];
652 let arr = PrimitiveArray::<TimestampMicrosecondType>::from(input);
653 let liquid = LiquidPrimitiveArray::<TimestampMicrosecondType>::from_arrow_array(arr);
654 let squeezed = SqueezedDate32Array::from_liquid_timestamp(&liquid, Date32Field::Year);
655 let component = squeezed.to_component_array();
656 let out = component
657 .as_any()
658 .downcast_ref::<PrimitiveArray<TimestampMicrosecondType>>()
659 .expect("timestamp array");
660
661 assert_eq!(out.value(0), 2021);
662 assert_eq!(out.value(1), 2022);
663 assert!(out.is_null(2));
664 }
665}