Skip to main content

liquid_cache/liquid_array/
variant_array.rs

1use std::sync::Arc;
2
3use arrow::array::{Array, ArrayRef, BinaryViewArray, StructArray};
4use arrow::buffer::NullBuffer;
5use arrow_schema::{DataType, Field, Fields};
6
7use crate::liquid_array::{
8    LiquidArrayRef, LiquidDataType, LiquidSqueezedArray, NeedsBacking, SqueezedBacking,
9};
10use ahash::AHashMap;
11
12/// Squeezed representation for variant arrays that contain multiple typed fields.
13#[derive(Debug)]
14pub struct VariantStructSqueezedArray {
15    values: AHashMap<Arc<str>, LiquidArrayRef>,
16    len: usize,
17    nulls: Option<NullBuffer>,
18    original_arrow_type: DataType,
19}
20
21impl VariantStructSqueezedArray {
22    /// Create a squeezed representation that keeps only the typed variant columns resident.
23    pub fn new(
24        values: Vec<(Arc<str>, LiquidArrayRef)>,
25        nulls: Option<NullBuffer>,
26        original_arrow_type: DataType,
27    ) -> Self {
28        let len = values.first().map(|(_, array)| array.len()).unwrap_or(0);
29        let mut map = AHashMap::with_capacity(values.len());
30        for (path, array) in values {
31            debug_assert_eq!(array.len(), len, "variant paths must share length");
32            map.insert(path, array);
33        }
34        Self {
35            values: map,
36            len,
37            nulls,
38            original_arrow_type,
39        }
40    }
41
42    fn build_root_struct(&self) -> StructArray {
43        let metadata = Arc::new(BinaryViewArray::from(vec![b"" as &[u8]; self.len])) as ArrayRef;
44        let value_placeholder = Arc::new(BinaryViewArray::new_null(self.len)) as ArrayRef;
45        let typed_struct = self.build_typed_struct();
46
47        let metadata_field = Arc::new(Field::new("metadata", DataType::BinaryView, false));
48        let value_field = Arc::new(Field::new("value", DataType::BinaryView, true));
49        let typed_field = Arc::new(Field::new(
50            "typed_value",
51            typed_struct.data_type().clone(),
52            true,
53        ));
54
55        StructArray::new(
56            Fields::from(vec![metadata_field, value_field, typed_field]),
57            vec![metadata, value_placeholder, typed_struct as ArrayRef],
58            self.nulls.clone(),
59        )
60    }
61
62    fn build_typed_struct(&self) -> Arc<StructArray> {
63        let mut root = VariantTreeNode::new(self.len);
64        for (path, array) in &self.values {
65            let segments: Vec<&str> = path
66                .split('.')
67                .filter(|segment| !segment.is_empty())
68                .collect();
69            if segments.is_empty() {
70                continue;
71            }
72            root.insert(&segments, array.to_arrow_array());
73        }
74        root.into_struct_array()
75    }
76
77    /// Returns true if the squeezed contains the provided variant path.
78    pub fn contains_path(&self, path: &str) -> bool {
79        self.values.contains_key(path)
80    }
81
82    /// Build an Arrow array that includes only the provided variant paths.
83    /// If `paths` is empty or none match, it falls back to the full array.
84    pub fn to_arrow_array_with_paths<'a>(
85        &self,
86        paths: impl IntoIterator<Item = &'a str>,
87    ) -> Result<ArrayRef, NeedsBacking> {
88        let mut filtered: Vec<(Arc<str>, LiquidArrayRef)> = Vec::new();
89        for path in paths.into_iter() {
90            if let Some(array) = self.values.get(path) {
91                filtered.push((Arc::from(path.to_string()), array.clone()));
92            }
93        }
94
95        if filtered.is_empty() {
96            return Ok(Arc::new(self.build_root_struct()) as ArrayRef);
97        }
98
99        let filtered = VariantStructSqueezedArray::new(
100            filtered,
101            self.nulls.clone(),
102            self.original_arrow_type.clone(),
103        );
104        Ok(Arc::new(filtered.build_root_struct()) as ArrayRef)
105    }
106
107    /// Clone the stored typed values keyed by variant path.
108    pub fn typed_values(&self) -> Vec<(Arc<str>, LiquidArrayRef)> {
109        self.values
110            .iter()
111            .map(|(path, array)| (path.clone(), array.clone()))
112            .collect()
113    }
114
115    /// Null buffer shared by all stored paths, if present.
116    pub fn nulls(&self) -> Option<NullBuffer> {
117        self.nulls.clone()
118    }
119}
120
121#[async_trait::async_trait]
122impl LiquidSqueezedArray for VariantStructSqueezedArray {
123    fn as_any(&self) -> &dyn std::any::Any {
124        self
125    }
126
127    fn get_array_memory_size(&self) -> usize {
128        self.values
129            .values()
130            .map(|array| array.get_array_memory_size())
131            .sum()
132    }
133
134    fn len(&self) -> usize {
135        self.len
136    }
137
138    async fn to_arrow_array(&self) -> ArrayRef {
139        Arc::new(self.build_root_struct()) as ArrayRef
140    }
141
142    fn data_type(&self) -> LiquidDataType {
143        LiquidDataType::ByteArray
144    }
145
146    fn original_arrow_data_type(&self) -> DataType {
147        self.original_arrow_type.clone()
148    }
149
150    fn disk_backing(&self) -> SqueezedBacking {
151        SqueezedBacking::Arrow
152    }
153}
154
155#[derive(Default)]
156struct VariantTreeNode {
157    len: usize,
158    leaf: Option<ArrayRef>,
159    children: AHashMap<String, VariantTreeNode>,
160}
161
162impl VariantTreeNode {
163    fn new(len: usize) -> Self {
164        Self {
165            len,
166            leaf: None,
167            children: AHashMap::new(),
168        }
169    }
170
171    fn insert(&mut self, segments: &[&str], values: ArrayRef) {
172        if segments.is_empty() {
173            self.leaf = Some(values);
174            return;
175        }
176        let (head, tail) = segments.split_first().unwrap();
177        self.children
178            .entry(head.to_string())
179            .or_insert_with(|| VariantTreeNode::new(self.len))
180            .insert(tail, values);
181    }
182
183    fn into_struct_array(self) -> Arc<StructArray> {
184        let mut fields = Vec::with_capacity(self.children.len());
185        let mut arrays = Vec::with_capacity(self.children.len());
186        let mut entries: Vec<_> = self.children.into_iter().collect();
187        entries.sort_by(|a, b| a.0.cmp(&b.0));
188        for (name, child) in entries {
189            let field_array = child.into_field_array();
190            fields.push(Arc::new(Field::new(
191                name.as_str(),
192                field_array.data_type().clone(),
193                false,
194            )));
195            arrays.push(field_array);
196        }
197        Arc::new(StructArray::new(Fields::from(fields), arrays, None))
198    }
199
200    fn into_field_array(self) -> ArrayRef {
201        let len = self.len;
202        if self.children.is_empty() {
203            let values = self.leaf.expect("variant leaf value present");
204            wrap_typed_value(len, values)
205        } else {
206            let typed_struct = self.into_struct_array() as ArrayRef;
207            wrap_typed_value(len, typed_struct)
208        }
209    }
210}
211
212fn wrap_typed_value(len: usize, values: ArrayRef) -> ArrayRef {
213    let placeholder = Arc::new(BinaryViewArray::new_null(len)) as ArrayRef;
214    Arc::new(StructArray::new(
215        Fields::from(vec![
216            Arc::new(Field::new("value", DataType::BinaryView, true)),
217            Arc::new(Field::new("typed_value", values.data_type().clone(), true)),
218        ]),
219        vec![placeholder, values],
220        None,
221    )) as ArrayRef
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use arrow::array::{Int64Array, StringArray};
228    use arrow_schema::DataType;
229
230    use crate::liquid_array::{LiquidByteArray, LiquidPrimitiveArray};
231
232    #[test]
233    fn to_arrow_array_with_paths_prunes_extra_fields() {
234        // Build squeezed variant with two typed paths: did (utf8) and time_us (int64).
235        let did_arrow = StringArray::from(vec![Some("d")]);
236        let (_comp, did_liquid) = LiquidByteArray::train_from_arrow(&did_arrow);
237        let did_liquid: LiquidArrayRef = Arc::new(did_liquid);
238
239        let time_arrow = Int64Array::from(vec![1_i64]);
240        let time_liquid =
241            LiquidPrimitiveArray::<arrow::datatypes::Int64Type>::from_arrow_array(time_arrow);
242        let time_liquid: LiquidArrayRef = Arc::new(time_liquid);
243
244        let squeezed = VariantStructSqueezedArray::new(
245            vec![
246                (Arc::from("did"), did_liquid),
247                (Arc::from("time_us"), time_liquid),
248            ],
249            None,
250            DataType::Struct(Fields::from(Vec::<Arc<Field>>::new())),
251        );
252
253        // Request only time_us; did should be pruned from typed_value.
254        let array = squeezed
255            .to_arrow_array_with_paths(["time_us"])
256            .expect("arrow array");
257        let root = array
258            .as_any()
259            .downcast_ref::<StructArray>()
260            .expect("struct root");
261        let typed_value = root
262            .column_by_name("typed_value")
263            .unwrap()
264            .as_any()
265            .downcast_ref::<StructArray>()
266            .unwrap();
267        let field_names: Vec<_> = typed_value
268            .fields()
269            .iter()
270            .map(|f| f.name().clone())
271            .collect();
272        assert_eq!(field_names, vec!["time_us"]);
273    }
274}