Skip to main content

liquid_cache/liquid_array/
variant_array.rs

1use std::sync::Arc;
2
3use arrow::array::{Array, ArrayRef, BinaryViewArray, StructArray};
4use arrow::buffer::NullBuffer;
5use arrow_schema::{DataType, Field, Fields};
6
7use crate::liquid_array::{
8    LiquidArrayRef, LiquidDataType, LiquidSqueezedArray, NeedsBacking, SqueezedBacking,
9};
10use ahash::AHashMap;
11
12/// Squeezed representation for variant arrays that contain multiple typed fields.
13#[derive(Debug)]
14pub struct VariantStructSqueezedArray {
15    values: AHashMap<Arc<str>, LiquidArrayRef>,
16    len: usize,
17    nulls: Option<NullBuffer>,
18    original_arrow_type: DataType,
19    disk_backing_size: usize,
20}
21
22impl VariantStructSqueezedArray {
23    /// Create a squeezed representation that keeps only the typed variant columns resident.
24    pub fn new(
25        values: Vec<(Arc<str>, LiquidArrayRef)>,
26        nulls: Option<NullBuffer>,
27        original_arrow_type: DataType,
28        disk_backing_size: usize,
29    ) -> Self {
30        let len = values.first().map(|(_, array)| array.len()).unwrap_or(0);
31        let mut map = AHashMap::with_capacity(values.len());
32        for (path, array) in values {
33            debug_assert_eq!(array.len(), len, "variant paths must share length");
34            map.insert(path, array);
35        }
36        Self {
37            values: map,
38            len,
39            nulls,
40            original_arrow_type,
41            disk_backing_size,
42        }
43    }
44
45    fn build_root_struct(&self) -> StructArray {
46        let metadata = Arc::new(BinaryViewArray::from(vec![b"" as &[u8]; self.len])) as ArrayRef;
47        let value_placeholder = Arc::new(BinaryViewArray::new_null(self.len)) as ArrayRef;
48        let typed_struct = self.build_typed_struct();
49
50        let metadata_field = Arc::new(Field::new("metadata", DataType::BinaryView, false));
51        let value_field = Arc::new(Field::new("value", DataType::BinaryView, true));
52        let typed_field = Arc::new(Field::new(
53            "typed_value",
54            typed_struct.data_type().clone(),
55            true,
56        ));
57
58        StructArray::new(
59            Fields::from(vec![metadata_field, value_field, typed_field]),
60            vec![metadata, value_placeholder, typed_struct as ArrayRef],
61            self.nulls.clone(),
62        )
63    }
64
65    fn build_typed_struct(&self) -> Arc<StructArray> {
66        let mut root = VariantTreeNode::new(self.len);
67        for (path, array) in &self.values {
68            let segments: Vec<&str> = path
69                .split('.')
70                .filter(|segment| !segment.is_empty())
71                .collect();
72            if segments.is_empty() {
73                continue;
74            }
75            root.insert(&segments, array.to_arrow_array());
76        }
77        root.into_struct_array()
78    }
79
80    /// Returns true if the squeezed contains the provided variant path.
81    pub fn contains_path(&self, path: &str) -> bool {
82        self.values.contains_key(path)
83    }
84
85    /// Build an Arrow array that includes only the provided variant paths.
86    /// If `paths` is empty or none match, it falls back to the full array.
87    pub fn to_arrow_array_with_paths<'a>(
88        &self,
89        paths: impl IntoIterator<Item = &'a str>,
90    ) -> Result<ArrayRef, NeedsBacking> {
91        let mut filtered: Vec<(Arc<str>, LiquidArrayRef)> = Vec::new();
92        for path in paths.into_iter() {
93            if let Some(array) = self.values.get(path) {
94                filtered.push((Arc::from(path.to_string()), array.clone()));
95            }
96        }
97
98        if filtered.is_empty() {
99            return Ok(Arc::new(self.build_root_struct()) as ArrayRef);
100        }
101
102        let filtered = VariantStructSqueezedArray::new(
103            filtered,
104            self.nulls.clone(),
105            self.original_arrow_type.clone(),
106            self.disk_backing_size,
107        );
108        Ok(Arc::new(filtered.build_root_struct()) as ArrayRef)
109    }
110
111    /// Clone the stored typed values keyed by variant path.
112    pub fn typed_values(&self) -> Vec<(Arc<str>, LiquidArrayRef)> {
113        self.values
114            .iter()
115            .map(|(path, array)| (path.clone(), array.clone()))
116            .collect()
117    }
118
119    /// Null buffer shared by all stored paths, if present.
120    pub fn nulls(&self) -> Option<NullBuffer> {
121        self.nulls.clone()
122    }
123}
124
125#[async_trait::async_trait]
126impl LiquidSqueezedArray for VariantStructSqueezedArray {
127    fn as_any(&self) -> &dyn std::any::Any {
128        self
129    }
130
131    fn get_array_memory_size(&self) -> usize {
132        self.values
133            .values()
134            .map(|array| array.get_array_memory_size())
135            .sum()
136    }
137
138    fn len(&self) -> usize {
139        self.len
140    }
141
142    async fn to_arrow_array(&self) -> ArrayRef {
143        Arc::new(self.build_root_struct()) as ArrayRef
144    }
145
146    fn data_type(&self) -> LiquidDataType {
147        LiquidDataType::ByteViewArray
148    }
149
150    fn original_arrow_data_type(&self) -> DataType {
151        self.original_arrow_type.clone()
152    }
153
154    fn disk_backing(&self) -> SqueezedBacking {
155        SqueezedBacking::Arrow(self.disk_backing_size)
156    }
157}
158
159#[derive(Default)]
160struct VariantTreeNode {
161    len: usize,
162    leaf: Option<ArrayRef>,
163    children: AHashMap<String, VariantTreeNode>,
164}
165
166impl VariantTreeNode {
167    fn new(len: usize) -> Self {
168        Self {
169            len,
170            leaf: None,
171            children: AHashMap::new(),
172        }
173    }
174
175    fn insert(&mut self, segments: &[&str], values: ArrayRef) {
176        if segments.is_empty() {
177            self.leaf = Some(values);
178            return;
179        }
180        let (head, tail) = segments.split_first().unwrap();
181        self.children
182            .entry(head.to_string())
183            .or_insert_with(|| VariantTreeNode::new(self.len))
184            .insert(tail, values);
185    }
186
187    fn into_struct_array(self) -> Arc<StructArray> {
188        let mut fields = Vec::with_capacity(self.children.len());
189        let mut arrays = Vec::with_capacity(self.children.len());
190        let mut entries: Vec<_> = self.children.into_iter().collect();
191        entries.sort_by(|a, b| a.0.cmp(&b.0));
192        for (name, child) in entries {
193            let field_array = child.into_field_array();
194            fields.push(Arc::new(Field::new(
195                name.as_str(),
196                field_array.data_type().clone(),
197                false,
198            )));
199            arrays.push(field_array);
200        }
201        Arc::new(StructArray::new(Fields::from(fields), arrays, None))
202    }
203
204    fn into_field_array(self) -> ArrayRef {
205        let len = self.len;
206        if self.children.is_empty() {
207            let values = self.leaf.expect("variant leaf value present");
208            wrap_typed_value(len, values)
209        } else {
210            let typed_struct = self.into_struct_array() as ArrayRef;
211            wrap_typed_value(len, typed_struct)
212        }
213    }
214}
215
216fn wrap_typed_value(len: usize, values: ArrayRef) -> ArrayRef {
217    let placeholder = Arc::new(BinaryViewArray::new_null(len)) as ArrayRef;
218    Arc::new(StructArray::new(
219        Fields::from(vec![
220            Arc::new(Field::new("value", DataType::BinaryView, true)),
221            Arc::new(Field::new("typed_value", values.data_type().clone(), true)),
222        ]),
223        vec![placeholder, values],
224        None,
225    )) as ArrayRef
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use arrow::array::{Int64Array, StringArray};
232    use arrow_schema::DataType;
233
234    use crate::liquid_array::{LiquidByteViewArray, LiquidPrimitiveArray, raw::FsstArray};
235
236    #[test]
237    fn to_arrow_array_with_paths_prunes_extra_fields() {
238        // Build squeezed variant with two typed paths: did (utf8) and time_us (int64).
239        let did_arrow = StringArray::from(vec![Some("d")]);
240        let (_comp, did_liquid) = LiquidByteViewArray::<FsstArray>::train_from_arrow(&did_arrow);
241        let did_liquid: LiquidArrayRef = Arc::new(did_liquid);
242
243        let time_arrow = Int64Array::from(vec![1_i64]);
244        let time_liquid =
245            LiquidPrimitiveArray::<arrow::datatypes::Int64Type>::from_arrow_array(time_arrow);
246        let time_liquid: LiquidArrayRef = Arc::new(time_liquid);
247
248        let squeezed = VariantStructSqueezedArray::new(
249            vec![
250                (Arc::from("did"), did_liquid),
251                (Arc::from("time_us"), time_liquid),
252            ],
253            None,
254            DataType::Struct(Fields::from(Vec::<Arc<Field>>::new())),
255            0,
256        );
257
258        // Request only time_us; did should be pruned from typed_value.
259        let array = squeezed
260            .to_arrow_array_with_paths(["time_us"])
261            .expect("arrow array");
262        let root = array
263            .as_any()
264            .downcast_ref::<StructArray>()
265            .expect("struct root");
266        let typed_value = root
267            .column_by_name("typed_value")
268            .unwrap()
269            .as_any()
270            .downcast_ref::<StructArray>()
271            .unwrap();
272        let field_names: Vec<_> = typed_value
273            .fields()
274            .iter()
275            .map(|f| f.name().clone())
276            .collect();
277        assert_eq!(field_names, vec!["time_us"]);
278    }
279}