Skip to main content

liquid_cache/liquid_array/
squeezed_date32_array.rs

1use arrow::array::{
2    ArrayRef, BooleanArray, PrimitiveArray,
3    cast::AsArray,
4    types::{
5        TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
6        TimestampSecondType,
7    },
8};
9use arrow::buffer::{BooleanBuffer, ScalarBuffer};
10use arrow::datatypes::{ArrowPrimitiveType, Date32Type, Int32Type, UInt32Type};
11use arrow_schema::{DataType, TimeUnit};
12use bytes::Bytes;
13use num_traits::AsPrimitive;
14use std::ops::Range;
15use std::sync::Arc;
16
17use super::LiquidArray;
18use super::primitive_array::LiquidPrimitiveArray;
19use super::{LiquidDataType, LiquidSqueezedArray, SqueezedBacking};
20use crate::cache::LiquidExpr;
21use crate::liquid_array::LiquidPrimitiveType;
22use crate::liquid_array::SqueezeIoHandler;
23use crate::liquid_array::eval_predicate_on_array;
24use crate::liquid_array::raw::BitPackedArray;
25use crate::utils::get_bit_width;
26
27/// Which component to extract from a `Date32`/Timestamp (days since UNIX epoch).
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize)]
29pub enum Date32Field {
30    /// Year component
31    Year,
32    /// Month component
33    Month,
34    /// Day component
35    Day,
36    /// Day-of-week component (Sunday=0).
37    DayOfWeek,
38}
39
40/// A bit-packed array that stores a single extracted component (YEAR/MONTH/DAY/DOW)
41/// from a `Date32`/Timestamp array.
42///
43/// Values are stored as unsigned offsets from `reference_value`, using the same
44/// bit-packing machinery as primitive arrays.
45#[derive(Debug, Clone)]
46pub struct SqueezedDate32Array {
47    field: Date32Field,
48    bit_packed: BitPackedArray<UInt32Type>,
49    /// The minimum extracted value used as reference for offsetting.
50    reference_value: i32,
51    original_data_type: DataType,
52    backing: Option<DiskBacking>,
53}
54
55#[derive(Debug, Clone)]
56struct DiskBacking {
57    io: Arc<dyn SqueezeIoHandler>,
58    disk_range: Range<u64>,
59}
60
61impl SqueezedDate32Array {
62    /// Build a squeezed representation (YEAR/MONTH/DAY/DAYOFWEEK) from a `Date32` array.
63    pub fn from_liquid_date32<T: LiquidPrimitiveType>(
64        array: &LiquidPrimitiveArray<T>,
65        field: Date32Field,
66    ) -> Self {
67        // Decode the logical Date32 array (i32: days since epoch) from the liquid array.
68        let arrow_array: PrimitiveArray<Date32Type> =
69            array.to_arrow_array().as_primitive::<Date32Type>().clone();
70
71        let (_dt, values, nulls) = arrow_array.into_parts();
72
73        // Compute min and max for the extracted component, skipping nulls.
74        let mut has_value = false;
75        let mut min_component: i32 = i32::MAX;
76        let mut max_component: i32 = i32::MIN;
77
78        // Fast path: if all nulls, return a null bit-packed array of the same length.
79        if let Some(nulls_buf) = &nulls
80            && nulls_buf.null_count() == values.len()
81        {
82            return Self {
83                field,
84                bit_packed: BitPackedArray::new_null_array(values.len()),
85                reference_value: 0,
86                original_data_type: DataType::Date32,
87                backing: None,
88            };
89        }
90
91        for (idx, &days) in values.iter().enumerate() {
92            if let Some(nulls_buf) = &nulls
93                && nulls_buf.is_null(idx)
94            {
95                continue;
96            }
97            let comp = component_from_days(field, days);
98            has_value = true;
99            if comp < min_component {
100                min_component = comp;
101            }
102            if comp > max_component {
103                max_component = comp;
104            }
105        }
106
107        // If no non-null values found, return an all-null structure (defensive)
108        if !has_value {
109            return Self {
110                field,
111                bit_packed: BitPackedArray::new_null_array(values.len()),
112                reference_value: 0,
113                original_data_type: DataType::Date32,
114                backing: None,
115            };
116        }
117
118        // Compute bit width from the value range.
119        let max_offset = (max_component as i64 - min_component as i64) as u64;
120        let bit_width = get_bit_width(max_offset);
121
122        // Build unsigned offsets for packing; placeholders are fine for nulls.
123        let offsets: ScalarBuffer<<UInt32Type as ArrowPrimitiveType>::Native> =
124            ScalarBuffer::from_iter((0..values.len()).map(|idx| {
125                if nulls.as_ref().is_some_and(|n| n.is_null(idx)) {
126                    0u32
127                } else {
128                    let comp = component_from_days(field, values[idx]);
129                    (comp - min_component) as u32
130                }
131            }));
132
133        let unsigned_array = PrimitiveArray::<UInt32Type>::new(offsets, nulls);
134        let bit_packed = BitPackedArray::from_primitive(unsigned_array, bit_width);
135
136        Self {
137            field,
138            bit_packed,
139            reference_value: min_component,
140            original_data_type: DataType::Date32,
141            backing: None,
142        }
143    }
144
145    /// Build a squeezed representation (YEAR/MONTH/DAY/DAYOFWEEK) from a timestamp array.
146    pub fn from_liquid_timestamp<T: LiquidPrimitiveType>(
147        array: &LiquidPrimitiveArray<T>,
148        field: Date32Field,
149    ) -> Self {
150        let unit = timestamp_unit(&T::DATA_TYPE).expect("timestamp data type");
151        let arrow_array: PrimitiveArray<T> = array.to_arrow_array().as_primitive::<T>().clone();
152        let (_dt, values, nulls) = arrow_array.into_parts();
153
154        let mut has_value = false;
155        let mut min_component: i32 = i32::MAX;
156        let mut max_component: i32 = i32::MIN;
157
158        if let Some(nulls_buf) = &nulls
159            && nulls_buf.null_count() == values.len()
160        {
161            return Self {
162                field,
163                bit_packed: BitPackedArray::new_null_array(values.len()),
164                reference_value: 0,
165                original_data_type: T::DATA_TYPE.clone(),
166                backing: None,
167            };
168        }
169
170        for (idx, &value) in values.iter().enumerate() {
171            if let Some(nulls_buf) = &nulls
172                && nulls_buf.is_null(idx)
173            {
174                continue;
175            }
176            let days = timestamp_to_days_since_epoch(value.as_(), unit);
177            let comp = component_from_days(field, days);
178            has_value = true;
179            if comp < min_component {
180                min_component = comp;
181            }
182            if comp > max_component {
183                max_component = comp;
184            }
185        }
186
187        if !has_value {
188            return Self {
189                field,
190                bit_packed: BitPackedArray::new_null_array(values.len()),
191                reference_value: 0,
192                original_data_type: T::DATA_TYPE.clone(),
193                backing: None,
194            };
195        }
196
197        let max_offset = (max_component as i64 - min_component as i64) as u64;
198        let bit_width = get_bit_width(max_offset);
199
200        let offsets: ScalarBuffer<<UInt32Type as ArrowPrimitiveType>::Native> =
201            ScalarBuffer::from_iter((0..values.len()).map(|idx| {
202                if nulls.as_ref().is_some_and(|n| n.is_null(idx)) {
203                    0u32
204                } else {
205                    let days = timestamp_to_days_since_epoch(values[idx].as_(), unit);
206                    let comp = component_from_days(field, days);
207                    (comp - min_component) as u32
208                }
209            }));
210
211        let unsigned_array = PrimitiveArray::<UInt32Type>::new(offsets, nulls);
212        let bit_packed = BitPackedArray::from_primitive(unsigned_array, bit_width);
213
214        Self {
215            field,
216            bit_packed,
217            reference_value: min_component,
218            original_data_type: T::DATA_TYPE.clone(),
219            backing: None,
220        }
221    }
222
223    pub(crate) fn with_backing(
224        mut self,
225        io: Arc<dyn SqueezeIoHandler>,
226        disk_range: Range<u64>,
227    ) -> Self {
228        self.backing = Some(DiskBacking { io, disk_range });
229        self
230    }
231
232    async fn read_backing(&self) -> Bytes {
233        let backing = self
234            .backing
235            .as_ref()
236            .expect("SqueezedDate32Array backing not set");
237        backing
238            .io
239            .read(Some(backing.disk_range.clone()))
240            .await
241            .expect("read squeezed backing")
242    }
243
244    /// Length of the array.
245    pub fn len(&self) -> usize {
246        self.bit_packed.len()
247    }
248
249    /// Whether the array has no elements.
250    pub fn is_empty(&self) -> bool {
251        self.len() == 0
252    }
253
254    /// Memory size of the bit-packed representation plus reference value.
255    pub fn get_array_memory_size(&self) -> usize {
256        self.bit_packed.get_array_memory_size() + std::mem::size_of::<i32>()
257    }
258
259    /// The extracted component type.
260    pub fn field(&self) -> Date32Field {
261        self.field
262    }
263
264    /// Convert to an Arrow array shaped like the original input, encoded so that
265    /// re-applying `date_part` (or any equivalent extraction) recovers the
266    /// component value originally squeezed.
267    pub fn to_component_array(&self) -> ArrayRef {
268        match &self.original_data_type {
269            DataType::Date32 => Arc::new(self.to_arrow_date32_lossy()) as ArrayRef,
270            DataType::Timestamp(unit, _) => self.to_arrow_timestamp_lossy(*unit),
271            _ => Arc::new(self.to_arrow_date32_lossy()) as ArrayRef,
272        }
273    }
274
275    /// Convert back to an Arrow `Int32` array representing the extracted component values.
276    /// Useful for verification or future pushdown logic.
277    pub fn to_component_date32(&self) -> PrimitiveArray<Date32Type> {
278        let unsigned: PrimitiveArray<UInt32Type> = self.bit_packed.to_primitive();
279        let (_dt, values, nulls) = unsigned.into_parts();
280        let ref_v = self.reference_value;
281        let signed_values: ScalarBuffer<<Int32Type as ArrowPrimitiveType>::Native> =
282            ScalarBuffer::from_iter(values.iter().map(|&v| (v as i32).saturating_add(ref_v)));
283        PrimitiveArray::<Date32Type>::new(signed_values, nulls)
284    }
285
286    /// Lossy reconstruction to Arrow Timestamp at the requested unit, using the
287    /// same date mapping as [`Self::to_arrow_date32_lossy`] (midnight UTC of the
288    /// reconstructed date).
289    pub fn to_arrow_timestamp_lossy(&self, unit: TimeUnit) -> ArrayRef {
290        let date_array = self.to_arrow_date32_lossy();
291        let (_dt, day_values, nulls) = date_array.into_parts();
292        let ticks_per_day: i64 = match unit {
293            TimeUnit::Second => 86_400,
294            TimeUnit::Millisecond => 86_400_000,
295            TimeUnit::Microsecond => 86_400_000_000,
296            TimeUnit::Nanosecond => 86_400_000_000_000,
297        };
298        let tick_values: ScalarBuffer<i64> =
299            ScalarBuffer::from_iter(day_values.iter().map(|&d| (d as i64) * ticks_per_day));
300        match unit {
301            TimeUnit::Second => Arc::new(PrimitiveArray::<TimestampSecondType>::new(
302                tick_values,
303                nulls,
304            )),
305            TimeUnit::Millisecond => Arc::new(PrimitiveArray::<TimestampMillisecondType>::new(
306                tick_values,
307                nulls,
308            )),
309            TimeUnit::Microsecond => Arc::new(PrimitiveArray::<TimestampMicrosecondType>::new(
310                tick_values,
311                nulls,
312            )),
313            TimeUnit::Nanosecond => Arc::new(PrimitiveArray::<TimestampNanosecondType>::new(
314                tick_values,
315                nulls,
316            )),
317        }
318    }
319
320    /// Lossy reconstruction to Arrow `Date32` (days since epoch).
321    ///
322    /// Mapping used:
323    /// - Year: (year, 1, 1)
324    /// - Month: (1970, month, 1)
325    /// - Day: (1970, 1, day)
326    /// - DayOfWeek: (1970, 1, 4 + dow) where 1970-01-04 is Sunday
327    pub fn to_arrow_date32_lossy(&self) -> PrimitiveArray<Date32Type> {
328        let unsigned: PrimitiveArray<UInt32Type> = self.bit_packed.to_primitive();
329        let (_dt, values, nulls) = unsigned.into_parts();
330
331        let ref_v = self.reference_value;
332        let days_values: ScalarBuffer<<Date32Type as ArrowPrimitiveType>::Native> =
333            ScalarBuffer::from_iter(values.iter().enumerate().map(|(i, &off)| {
334                if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
335                    0i32
336                } else {
337                    match self.field {
338                        Date32Field::Year => {
339                            let y = ref_v + off as i32;
340                            ymd_to_epoch_days(y, 1, 1)
341                        }
342                        Date32Field::Month => {
343                            let m = (ref_v + off as i32) as u32;
344                            ymd_to_epoch_days(1970, m, 1)
345                        }
346                        Date32Field::Day => {
347                            let d = (ref_v + off as i32) as u32;
348                            ymd_to_epoch_days(1970, 1, d)
349                        }
350                        Date32Field::DayOfWeek => {
351                            let dow = ref_v + off as i32;
352                            ymd_to_epoch_days(1970, 1, 4).saturating_add(dow)
353                        }
354                    }
355                }
356            }));
357
358        PrimitiveArray::<Date32Type>::new(days_values, nulls)
359    }
360}
361
362/// Convert days since UNIX epoch (1970-01-01) to (year, month, day) in the
363/// proleptic Gregorian calendar using a branchless integer algorithm.
364fn ymd_from_epoch_days(days_since_epoch: i32) -> (i32, u32, u32) {
365    // Based on Howard Hinnant's civil_from_days algorithm.
366    let z = days_since_epoch as i64 + 719_468; // shift to civil epoch
367    let era = if z >= 0 {
368        z / 146_097
369    } else {
370        (z - 146_096) / 146_097
371    };
372    let doe = z - era * 146_097; // [0, 146096]
373    let yoe = (doe - doe / 1_460 + doe / 36_524 - doe / 146_096) / 365; // [0, 399]
374    let mut y = yoe + era * 400;
375    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); // [0, 365]
376    let mp = (5 * doy + 2) / 153; // [0, 11]
377    let d = (doy - (153 * mp + 2) / 5) + 1; // [1, 31]
378    let m = mp + if mp < 10 { 3 } else { -9 }; // [1, 12]
379    if m <= 2 {
380        y += 1;
381    }
382    (y as i32, m as u32, d as u32)
383}
384
385fn component_from_days(field: Date32Field, days: i32) -> i32 {
386    let (year, month, day) = ymd_from_epoch_days(days);
387    match field {
388        Date32Field::Year => year,
389        Date32Field::Month => month as i32,
390        Date32Field::Day => day as i32,
391        Date32Field::DayOfWeek => day_of_week_sunday0(days),
392    }
393}
394
395fn day_of_week_sunday0(days_since_epoch: i32) -> i32 {
396    (days_since_epoch + 4).rem_euclid(7)
397}
398
399fn timestamp_unit(data_type: &DataType) -> Option<TimeUnit> {
400    match data_type {
401        DataType::Timestamp(unit, _) => Some(*unit),
402        _ => None,
403    }
404}
405
406fn timestamp_to_days_since_epoch(value: i64, unit: TimeUnit) -> i32 {
407    let ticks_per_day = match unit {
408        TimeUnit::Second => 86_400,
409        TimeUnit::Millisecond => 86_400_000,
410        TimeUnit::Microsecond => 86_400_000_000,
411        TimeUnit::Nanosecond => 86_400_000_000_000,
412    };
413    (value.div_euclid(ticks_per_day)) as i32
414}
415
416/// Convert a date (year, month, day) in proleptic Gregorian calendar to
417/// days since UNIX epoch (1970-01-01).
418fn ymd_to_epoch_days(year: i32, month: u32, day: u32) -> i32 {
419    // Based on Howard Hinnant's civil_to_days algorithm.
420    let y = year as i64 - if month <= 2 { 1 } else { 0 };
421    let era = if y >= 0 { y / 400 } else { (y - 399) / 400 };
422    let yoe = y - era * 400; // [0, 399]
423    let m = month as i64;
424    let d = day as i64;
425    let mp = m + if m > 2 { -3 } else { 9 }; // Mar=0..Jan=10,Feb=11
426    let doy = (153 * mp + 2) / 5 + d - 1; // [0, 365]
427    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; // [0, 146096]
428    (era * 146_097 + doe - 719_468) as i32
429}
430
431#[async_trait::async_trait]
432impl LiquidSqueezedArray for SqueezedDate32Array {
433    fn as_any(&self) -> &dyn std::any::Any {
434        self
435    }
436
437    fn get_array_memory_size(&self) -> usize {
438        self.get_array_memory_size()
439    }
440
441    fn len(&self) -> usize {
442        self.len()
443    }
444
445    async fn to_arrow_array(&self) -> ArrayRef {
446        let bytes = self.read_backing().await;
447        let liquid = crate::liquid_array::ipc::read_from_bytes(
448            bytes,
449            &crate::liquid_array::ipc::LiquidIPCContext::new(None),
450        );
451        liquid.to_arrow_array()
452    }
453
454    fn data_type(&self) -> LiquidDataType {
455        LiquidDataType::Integer
456    }
457
458    fn original_arrow_data_type(&self) -> DataType {
459        self.original_data_type.clone()
460    }
461
462    fn disk_backing(&self) -> SqueezedBacking {
463        let backing = self
464            .backing
465            .as_ref()
466            .expect("SqueezedDate32Array backing not set");
467        SqueezedBacking::Liquid((backing.disk_range.end - backing.disk_range.start) as usize)
468    }
469
470    async fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
471        if selection.count_set_bits() == 0 {
472            return arrow::array::new_empty_array(&self.original_arrow_data_type());
473        }
474        let full = self.to_arrow_array().await;
475        let selection_array = BooleanArray::new(selection.clone(), None);
476        arrow::compute::filter(&full, &selection_array).unwrap()
477    }
478
479    async fn try_eval_predicate(
480        &self,
481        predicate: &LiquidExpr,
482        filter: &BooleanBuffer,
483    ) -> BooleanArray {
484        let filtered = self.filter(filter).await;
485        eval_predicate_on_array(filtered, predicate)
486    }
487}
488
489#[cfg(test)]
490mod tests {
491    use super::*;
492    use arrow::array::types::TimestampMicrosecondType;
493    use arrow::array::{Array, PrimitiveArray};
494    use std::sync::Arc;
495
496    fn dates(vals: &[Option<i32>]) -> PrimitiveArray<Date32Type> {
497        PrimitiveArray::<Date32Type>::from(vals.to_vec())
498    }
499
500    fn assert_prim_eq<T: ArrowPrimitiveType>(a: PrimitiveArray<T>, b: PrimitiveArray<T>) {
501        let a_ref: arrow::array::ArrayRef = Arc::new(a);
502        let b_ref: arrow::array::ArrayRef = Arc::new(b);
503        assert_eq!(a_ref.as_ref(), b_ref.as_ref());
504    }
505
506    fn extract(field: Date32Field, input: Vec<Option<i32>>) -> PrimitiveArray<Date32Type> {
507        let arr = dates(&input);
508        let liquid = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(arr);
509        let squeezed = SqueezedDate32Array::from_liquid_date32(&liquid, field);
510        squeezed.to_component_date32()
511    }
512
513    fn lossy(field: Date32Field, input: Vec<Option<i32>>) -> PrimitiveArray<Date32Type> {
514        let arr = dates(&input);
515        let liquid = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(arr);
516        let squeezed = SqueezedDate32Array::from_liquid_date32(&liquid, field);
517        squeezed.to_arrow_date32_lossy()
518    }
519
520    #[test]
521    fn test_extraction_correctness() {
522        // YEAR
523        let input = vec![
524            Some(-1),
525            Some(0),
526            Some(ymd_to_epoch_days(1971, 7, 15)),
527            None,
528        ];
529        let expected =
530            PrimitiveArray::<Date32Type>::from(vec![Some(1969), Some(1970), Some(1971), None]);
531        assert_prim_eq(extract(Date32Field::Year, input), expected);
532
533        // MONTH
534        let input = vec![
535            Some(ymd_to_epoch_days(1970, 1, 31)),
536            Some(ymd_to_epoch_days(1970, 2, 1)),
537            Some(ymd_to_epoch_days(1970, 12, 31)),
538            None,
539        ];
540        let expected = PrimitiveArray::<Date32Type>::from(vec![Some(1), Some(2), Some(12), None]);
541        assert_prim_eq(extract(Date32Field::Month, input), expected);
542
543        // DAY
544        let input = vec![
545            Some(ymd_to_epoch_days(1970, 1, 1)),
546            Some(ymd_to_epoch_days(1970, 1, 31)),
547            Some(ymd_to_epoch_days(1970, 2, 1)),
548            None,
549        ];
550        let expected = PrimitiveArray::<Date32Type>::from(vec![Some(1), Some(31), Some(1), None]);
551        assert_prim_eq(extract(Date32Field::Day, input), expected);
552
553        // DAYOFWEEK (Sunday=0)
554        let input = vec![
555            Some(ymd_to_epoch_days(1970, 1, 4)),
556            Some(ymd_to_epoch_days(1970, 1, 5)),
557            Some(ymd_to_epoch_days(1970, 1, 10)),
558            None,
559        ];
560        let expected = PrimitiveArray::<Date32Type>::from(vec![Some(0), Some(1), Some(6), None]);
561        assert_prim_eq(extract(Date32Field::DayOfWeek, input), expected);
562    }
563
564    #[test]
565    fn test_lossy_reconstruction_mapping() {
566        // YEAR → (y,1,1)
567        let input = vec![
568            Some(ymd_to_epoch_days(1999, 12, 31)),
569            Some(ymd_to_epoch_days(2000, 6, 1)),
570            None,
571        ];
572        let expected = PrimitiveArray::<Date32Type>::from(vec![
573            Some(ymd_to_epoch_days(1999, 1, 1)),
574            Some(ymd_to_epoch_days(2000, 1, 1)),
575            None,
576        ]);
577        assert_prim_eq(lossy(Date32Field::Year, input), expected);
578
579        // MONTH → (1970,m,1)
580        let input = vec![
581            Some(ymd_to_epoch_days(1980, 3, 14)),
582            Some(ymd_to_epoch_days(1977, 12, 5)),
583            None,
584        ];
585        let expected = PrimitiveArray::<Date32Type>::from(vec![
586            Some(ymd_to_epoch_days(1970, 3, 1)),
587            Some(ymd_to_epoch_days(1970, 12, 1)),
588            None,
589        ]);
590        assert_prim_eq(lossy(Date32Field::Month, input), expected);
591
592        // DAY → (1970,1,d)
593        let input = vec![
594            Some(ymd_to_epoch_days(1980, 3, 14)),
595            Some(ymd_to_epoch_days(1977, 12, 5)),
596            None,
597        ];
598        let expected = PrimitiveArray::<Date32Type>::from(vec![
599            Some(ymd_to_epoch_days(1970, 1, 14)),
600            Some(ymd_to_epoch_days(1970, 1, 5)),
601            None,
602        ]);
603        assert_prim_eq(lossy(Date32Field::Day, input), expected);
604
605        // DAYOFWEEK → (1970,1,4 + dow)
606        let input = vec![
607            Some(ymd_to_epoch_days(2020, 5, 17)),
608            Some(ymd_to_epoch_days(2020, 5, 18)),
609            None,
610        ];
611        let expected = PrimitiveArray::<Date32Type>::from(vec![
612            Some(ymd_to_epoch_days(1970, 1, 4)),
613            Some(ymd_to_epoch_days(1970, 1, 5)),
614            None,
615        ]);
616        assert_prim_eq(lossy(Date32Field::DayOfWeek, input), expected);
617    }
618
619    #[test]
620    fn test_roundtrip_idempotence() {
621        let input = vec![
622            Some(ymd_to_epoch_days(1969, 12, 31)),
623            Some(ymd_to_epoch_days(1970, 1, 1)),
624            Some(ymd_to_epoch_days(1970, 1, 31)),
625            Some(ymd_to_epoch_days(1970, 2, 1)),
626            Some(ymd_to_epoch_days(1971, 7, 15)),
627            None,
628        ];
629
630        for &field in &[
631            Date32Field::Year,
632            Date32Field::Month,
633            Date32Field::Day,
634            Date32Field::DayOfWeek,
635        ] {
636            let comp1 = extract(field, input.clone());
637            let lossy_dt = lossy(field, input.clone());
638            let liquid2 = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(lossy_dt);
639            let comp2 =
640                SqueezedDate32Array::from_liquid_date32(&liquid2, field).to_component_date32();
641            assert_prim_eq(comp1, comp2);
642        }
643    }
644
645    #[test]
646    fn test_all_nulls_behavior() {
647        let input = vec![None, None, None];
648
649        for &field in &[
650            Date32Field::Year,
651            Date32Field::Month,
652            Date32Field::Day,
653            Date32Field::DayOfWeek,
654        ] {
655            let comp = extract(field, input.clone());
656            let expected_comp = PrimitiveArray::<Date32Type>::from(vec![None, None, None]);
657            assert_prim_eq(comp, expected_comp);
658
659            let lossy_dt = lossy(field, input.clone());
660            let expected_dt = PrimitiveArray::<Date32Type>::from(vec![None, None, None]);
661            assert_prim_eq(lossy_dt, expected_dt);
662        }
663    }
664
665    /// `to_component_array` is consumed by [`crate::cache::core::LiquidCache::try_read_squeezed_date32_array`]
666    /// as the SQL fast path. The query plan still runs `date_part` on the returned array, so the
667    /// values must round-trip through `component_from_days`: feeding a returned Date32 day-value
668    /// back into `component_from_days(field, days)` must recover the original component.
669    ///
670    /// Before the encoding fix, the Year case returned `Date32(year_int)` (e.g. year 1970 became
671    /// Date32 day-1970 = 1975-05-24), so re-extracting the year gave 1975 instead of 1970.
672    #[test]
673    fn to_component_array_date32_round_trips_through_extract() {
674        let inputs: Vec<Option<i32>> = vec![
675            Some(ymd_to_epoch_days(1970, 1, 1)),
676            Some(ymd_to_epoch_days(1971, 7, 15)),
677            Some(ymd_to_epoch_days(1999, 12, 31)),
678            Some(ymd_to_epoch_days(2024, 2, 29)),
679            Some(ymd_to_epoch_days(4709, 11, 24)),
680            None,
681        ];
682        let expected_components: Vec<Option<i32>> = inputs
683            .iter()
684            .map(|opt| opt.map(|d| component_from_days(Date32Field::Year, d)))
685            .collect();
686
687        let arr = dates(&inputs);
688        let liquid = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(arr);
689        let squeezed = SqueezedDate32Array::from_liquid_date32(&liquid, Date32Field::Year);
690        let component = squeezed
691            .to_component_array()
692            .as_any()
693            .downcast_ref::<PrimitiveArray<Date32Type>>()
694            .expect("date32 component array")
695            .clone();
696
697        for (idx, expected) in expected_components.iter().enumerate() {
698            match expected {
699                Some(year) => {
700                    assert!(!component.is_null(idx), "row {idx} unexpectedly null");
701                    let recovered = component_from_days(Date32Field::Year, component.value(idx));
702                    assert_eq!(
703                        recovered, *year,
704                        "row {idx}: extracting Year from to_component_array output recovered {recovered}, expected {year}",
705                    );
706                }
707                None => assert!(component.is_null(idx), "row {idx} should be null"),
708            }
709        }
710    }
711
712    #[test]
713    fn test_timestamp_extraction() {
714        // Two Microsecond timestamps at 2021-01-01 00:00:00 UTC and 2022-01-01 00:00:00 UTC.
715        let input = vec![
716            Some(1_609_459_200_000_000),
717            Some(1_640_995_200_000_000),
718            None,
719        ];
720        let arr = PrimitiveArray::<TimestampMicrosecondType>::from(input);
721        let liquid = LiquidPrimitiveArray::<TimestampMicrosecondType>::from_arrow_array(arr);
722        let squeezed = SqueezedDate32Array::from_liquid_timestamp(&liquid, Date32Field::Year);
723        let component = squeezed.to_component_array();
724        let out = component
725            .as_any()
726            .downcast_ref::<PrimitiveArray<TimestampMicrosecondType>>()
727            .expect("timestamp array");
728
729        // to_component_array returns Timestamps that round-trip through `date_part`:
730        // year 2021 maps to (2021,1,1) at midnight UTC.
731        let micros_per_day: i64 = 86_400_000_000;
732        assert_eq!(
733            out.value(0),
734            ymd_to_epoch_days(2021, 1, 1) as i64 * micros_per_day,
735        );
736        assert_eq!(
737            out.value(1),
738            ymd_to_epoch_days(2022, 1, 1) as i64 * micros_per_day,
739        );
740        assert!(out.is_null(2));
741
742        // Direct integer view is still available via `to_component_date32`.
743        let int_view = squeezed.to_component_date32();
744        assert_eq!(int_view.value(0), 2021);
745        assert_eq!(int_view.value(1), 2022);
746        assert!(int_view.is_null(2));
747    }
748}