vortex_sequence/
serde.rs

1use vortex_array::serde::ArrayChildren;
2use vortex_array::vtable::{EncodeVTable, SerdeVTable};
3use vortex_array::{Canonical, DeserializeMetadata, ProstMetadata};
4use vortex_buffer::ByteBuffer;
5use vortex_dtype::DType;
6use vortex_dtype::Nullability::NonNullable;
7use vortex_error::{VortexExpect, VortexResult, vortex_err};
8use vortex_proto::scalar::ScalarValue;
9use vortex_scalar::Scalar;
10
11use crate::array::{SequenceArray, SequenceEncoding, SequenceVTable};
12
13#[derive(Clone, prost::Message)]
14pub struct SequenceMetadata {
15    #[prost(message, tag = "1")]
16    base: Option<ScalarValue>,
17    #[prost(message, tag = "2")]
18    multiplier: Option<ScalarValue>,
19}
20
21impl EncodeVTable<SequenceVTable> for SequenceVTable {
22    fn encode(
23        _encoding: &SequenceEncoding,
24        _canonical: &Canonical,
25        _like: Option<&SequenceArray>,
26    ) -> VortexResult<Option<SequenceArray>> {
27        // TODO(joe): hook up compressor
28        Ok(None)
29    }
30}
31
32impl SerdeVTable<SequenceVTable> for SequenceVTable {
33    type Metadata = ProstMetadata<SequenceMetadata>;
34
35    fn metadata(array: &SequenceArray) -> VortexResult<Option<Self::Metadata>> {
36        Ok(Some(ProstMetadata(SequenceMetadata {
37            base: Some((&array.base()).into()),
38            multiplier: Some((&array.multiplier()).into()),
39        })))
40    }
41
42    fn build(
43        _encoding: &SequenceEncoding,
44        dtype: &DType,
45        len: usize,
46        metadata: &<Self::Metadata as DeserializeMetadata>::Output,
47        _buffers: &[ByteBuffer],
48        _children: &dyn ArrayChildren,
49    ) -> VortexResult<SequenceArray> {
50        let ptype = dtype.as_ptype();
51
52        // We go via scalar to cast the scalar values into the correct PType
53        let base = Scalar::new(
54            DType::Primitive(ptype, NonNullable),
55            metadata
56                .base
57                .as_ref()
58                .ok_or_else(|| vortex_err!("base required"))?
59                .try_into()?,
60        )
61        .as_primitive()
62        .pvalue()
63        .vortex_expect("non-nullable primitive");
64
65        let multiplier = Scalar::new(
66            DType::Primitive(ptype, NonNullable),
67            metadata
68                .multiplier
69                .as_ref()
70                .ok_or_else(|| vortex_err!("base required"))?
71                .try_into()?,
72        )
73        .as_primitive()
74        .pvalue()
75        .vortex_expect("non-nullable primitive");
76
77        Ok(SequenceArray::unchecked_new(base, multiplier, ptype, len))
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use std::sync::Arc;
84
85    use arcref::ArcRef;
86    use vortex_array::arrays::{PrimitiveArray, StructArray};
87    use vortex_array::stream::ArrayStreamExt;
88    use vortex_expr::{get_item, root};
89    use vortex_file::{VortexOpenOptions, VortexWriteOptions};
90    use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
91
92    use crate::SequenceArray;
93
94    #[tokio::test]
95    async fn round_trip_seq() {
96        let seq = SequenceArray::typed_new(2i8, 3, 4).unwrap();
97        let st = StructArray::from_fields(&[("a", seq.to_array())]).unwrap();
98
99        let file = tokio::fs::File::create("/tmp/abc.vx").await.unwrap();
100        VortexWriteOptions::default()
101            .with_strategy(ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())))
102            .write(file, st.to_array_stream())
103            .await
104            .unwrap();
105
106        let file = VortexOpenOptions::file().open("/tmp/abc.vx").await.unwrap();
107        let array = file
108            .scan()
109            .unwrap()
110            .with_projection(get_item("a", root()))
111            .into_array_stream()
112            .unwrap()
113            .read_all()
114            .await
115            .unwrap();
116
117        let canon = PrimitiveArray::from_iter((0..4).map(|i| 2i8 + i * 3));
118
119        assert_eq!(
120            array
121                .to_canonical()
122                .unwrap()
123                .into_primitive()
124                .unwrap()
125                .as_slice::<i8>(),
126            canon.as_slice::<i8>()
127        )
128    }
129}