vortex_array/array/extension/compute/
to_arrow.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use std::sync::Arc;

use arrow_array::{
    ArrayRef, Date32Array, Date64Array, Time32MillisecondArray, Time32SecondArray,
    Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
};
use arrow_schema::DataType;
use vortex_datetime_dtype::{is_temporal_ext_type, TemporalMetadata, TimeUnit};
use vortex_dtype::{DType, NativePType};
use vortex_error::{vortex_bail, VortexResult};

use crate::array::{ExtensionArray, ExtensionEncoding, TemporalArray};
use crate::canonical::IntoArrayVariant;
use crate::compute::{to_arrow, try_cast, ToArrowFn};
use crate::IntoArray;

impl ToArrowFn<ExtensionArray> for ExtensionEncoding {
    fn to_arrow(
        &self,
        array: &ExtensionArray,
        data_type: &DataType,
    ) -> VortexResult<Option<ArrayRef>> {
        // NOTE(ngates): this is really gross... but I guess it's ok given how tightly integrated
        //  we are with Arrow.
        if is_temporal_ext_type(array.id()) {
            temporal_to_arrow(TemporalArray::try_from(array.clone().into_array())?).map(Some)
        } else {
            // Convert storage array directly into arrow, losing type information
            // that will let us round-trip.
            // TODO(aduffy): https://github.com/spiraldb/vortex/issues/1167
            to_arrow(array.storage(), data_type).map(Some)
        }
    }
}

fn temporal_to_arrow(temporal_array: TemporalArray) -> VortexResult<ArrayRef> {
    macro_rules! extract_temporal_values {
        ($values:expr, $prim:ty) => {{
            let temporal_values = try_cast(
                $values,
                &DType::Primitive(<$prim as NativePType>::PTYPE, $values.dtype().nullability()),
            )?
            .into_primitive()?;
            let nulls = temporal_values.logical_validity()?.to_null_buffer();
            let scalars = temporal_values.into_buffer().into_arrow_scalar_buffer();

            (scalars, nulls)
        }};
    }

    Ok(match temporal_array.temporal_metadata() {
        TemporalMetadata::Date(time_unit) => match time_unit {
            TimeUnit::D => {
                let (scalars, nulls) =
                    extract_temporal_values!(&temporal_array.temporal_values(), i32);
                Arc::new(Date32Array::new(scalars, nulls))
            }
            TimeUnit::Ms => {
                let (scalars, nulls) =
                    extract_temporal_values!(&temporal_array.temporal_values(), i64);
                Arc::new(Date64Array::new(scalars, nulls))
            }
            _ => vortex_bail!(
                "Invalid TimeUnit {time_unit} for {}",
                temporal_array.ext_dtype().id()
            ),
        },
        TemporalMetadata::Time(time_unit) => match time_unit {
            TimeUnit::S => {
                let (scalars, nulls) =
                    extract_temporal_values!(&temporal_array.temporal_values(), i32);
                Arc::new(Time32SecondArray::new(scalars, nulls))
            }
            TimeUnit::Ms => {
                let (scalars, nulls) =
                    extract_temporal_values!(&temporal_array.temporal_values(), i32);
                Arc::new(Time32MillisecondArray::new(scalars, nulls))
            }
            TimeUnit::Us => {
                let (scalars, nulls) =
                    extract_temporal_values!(&temporal_array.temporal_values(), i64);
                Arc::new(Time64MicrosecondArray::new(scalars, nulls))
            }
            TimeUnit::Ns => {
                let (scalars, nulls) =
                    extract_temporal_values!(&temporal_array.temporal_values(), i64);
                Arc::new(Time64NanosecondArray::new(scalars, nulls))
            }
            _ => vortex_bail!(
                "Invalid TimeUnit {time_unit} for {}",
                temporal_array.ext_dtype().id()
            ),
        },
        TemporalMetadata::Timestamp(time_unit, _) => {
            let (scalars, nulls) = extract_temporal_values!(&temporal_array.temporal_values(), i64);
            match time_unit {
                TimeUnit::Ns => Arc::new(TimestampNanosecondArray::new(scalars, nulls)),
                TimeUnit::Us => Arc::new(TimestampMicrosecondArray::new(scalars, nulls)),
                TimeUnit::Ms => Arc::new(TimestampMillisecondArray::new(scalars, nulls)),
                TimeUnit::S => Arc::new(TimestampSecondArray::new(scalars, nulls)),
                _ => vortex_bail!(
                    "Invalid TimeUnit {time_unit} for {}",
                    temporal_array.ext_dtype().id()
                ),
            }
        }
    })
}