liquid_cache/liquid_array/
variant_array.rs1use std::sync::Arc;
2
3use arrow::array::{Array, ArrayRef, BinaryViewArray, StructArray};
4use arrow::buffer::NullBuffer;
5use arrow_schema::{DataType, Field, Fields};
6
7use crate::liquid_array::{
8 LiquidArrayRef, LiquidDataType, LiquidSqueezedArray, NeedsBacking, SqueezedBacking,
9};
10use ahash::AHashMap;
11
12#[derive(Debug)]
14pub struct VariantStructSqueezedArray {
15 values: AHashMap<Arc<str>, LiquidArrayRef>,
16 len: usize,
17 nulls: Option<NullBuffer>,
18 original_arrow_type: DataType,
19}
20
21impl VariantStructSqueezedArray {
22 pub fn new(
24 values: Vec<(Arc<str>, LiquidArrayRef)>,
25 nulls: Option<NullBuffer>,
26 original_arrow_type: DataType,
27 ) -> Self {
28 let len = values.first().map(|(_, array)| array.len()).unwrap_or(0);
29 let mut map = AHashMap::with_capacity(values.len());
30 for (path, array) in values {
31 debug_assert_eq!(array.len(), len, "variant paths must share length");
32 map.insert(path, array);
33 }
34 Self {
35 values: map,
36 len,
37 nulls,
38 original_arrow_type,
39 }
40 }
41
42 fn build_root_struct(&self) -> StructArray {
43 let metadata = Arc::new(BinaryViewArray::from(vec![b"" as &[u8]; self.len])) as ArrayRef;
44 let value_placeholder = Arc::new(BinaryViewArray::new_null(self.len)) as ArrayRef;
45 let typed_struct = self.build_typed_struct();
46
47 let metadata_field = Arc::new(Field::new("metadata", DataType::BinaryView, false));
48 let value_field = Arc::new(Field::new("value", DataType::BinaryView, true));
49 let typed_field = Arc::new(Field::new(
50 "typed_value",
51 typed_struct.data_type().clone(),
52 true,
53 ));
54
55 StructArray::new(
56 Fields::from(vec![metadata_field, value_field, typed_field]),
57 vec![metadata, value_placeholder, typed_struct as ArrayRef],
58 self.nulls.clone(),
59 )
60 }
61
62 fn build_typed_struct(&self) -> Arc<StructArray> {
63 let mut root = VariantTreeNode::new(self.len);
64 for (path, array) in &self.values {
65 let segments: Vec<&str> = path
66 .split('.')
67 .filter(|segment| !segment.is_empty())
68 .collect();
69 if segments.is_empty() {
70 continue;
71 }
72 root.insert(&segments, array.to_arrow_array());
73 }
74 root.into_struct_array()
75 }
76
77 pub fn contains_path(&self, path: &str) -> bool {
79 self.values.contains_key(path)
80 }
81
82 pub fn to_arrow_array_with_paths<'a>(
85 &self,
86 paths: impl IntoIterator<Item = &'a str>,
87 ) -> Result<ArrayRef, NeedsBacking> {
88 let mut filtered: Vec<(Arc<str>, LiquidArrayRef)> = Vec::new();
89 for path in paths.into_iter() {
90 if let Some(array) = self.values.get(path) {
91 filtered.push((Arc::from(path.to_string()), array.clone()));
92 }
93 }
94
95 if filtered.is_empty() {
96 return Ok(Arc::new(self.build_root_struct()) as ArrayRef);
97 }
98
99 let filtered = VariantStructSqueezedArray::new(
100 filtered,
101 self.nulls.clone(),
102 self.original_arrow_type.clone(),
103 );
104 Ok(Arc::new(filtered.build_root_struct()) as ArrayRef)
105 }
106
107 pub fn typed_values(&self) -> Vec<(Arc<str>, LiquidArrayRef)> {
109 self.values
110 .iter()
111 .map(|(path, array)| (path.clone(), array.clone()))
112 .collect()
113 }
114
115 pub fn nulls(&self) -> Option<NullBuffer> {
117 self.nulls.clone()
118 }
119}
120
121#[async_trait::async_trait]
122impl LiquidSqueezedArray for VariantStructSqueezedArray {
123 fn as_any(&self) -> &dyn std::any::Any {
124 self
125 }
126
127 fn get_array_memory_size(&self) -> usize {
128 self.values
129 .values()
130 .map(|array| array.get_array_memory_size())
131 .sum()
132 }
133
134 fn len(&self) -> usize {
135 self.len
136 }
137
138 async fn to_arrow_array(&self) -> ArrayRef {
139 Arc::new(self.build_root_struct()) as ArrayRef
140 }
141
142 fn data_type(&self) -> LiquidDataType {
143 LiquidDataType::ByteArray
144 }
145
146 fn original_arrow_data_type(&self) -> DataType {
147 self.original_arrow_type.clone()
148 }
149
150 fn disk_backing(&self) -> SqueezedBacking {
151 SqueezedBacking::Arrow
152 }
153}
154
155#[derive(Default)]
156struct VariantTreeNode {
157 len: usize,
158 leaf: Option<ArrayRef>,
159 children: AHashMap<String, VariantTreeNode>,
160}
161
162impl VariantTreeNode {
163 fn new(len: usize) -> Self {
164 Self {
165 len,
166 leaf: None,
167 children: AHashMap::new(),
168 }
169 }
170
171 fn insert(&mut self, segments: &[&str], values: ArrayRef) {
172 if segments.is_empty() {
173 self.leaf = Some(values);
174 return;
175 }
176 let (head, tail) = segments.split_first().unwrap();
177 self.children
178 .entry(head.to_string())
179 .or_insert_with(|| VariantTreeNode::new(self.len))
180 .insert(tail, values);
181 }
182
183 fn into_struct_array(self) -> Arc<StructArray> {
184 let mut fields = Vec::with_capacity(self.children.len());
185 let mut arrays = Vec::with_capacity(self.children.len());
186 let mut entries: Vec<_> = self.children.into_iter().collect();
187 entries.sort_by(|a, b| a.0.cmp(&b.0));
188 for (name, child) in entries {
189 let field_array = child.into_field_array();
190 fields.push(Arc::new(Field::new(
191 name.as_str(),
192 field_array.data_type().clone(),
193 false,
194 )));
195 arrays.push(field_array);
196 }
197 Arc::new(StructArray::new(Fields::from(fields), arrays, None))
198 }
199
200 fn into_field_array(self) -> ArrayRef {
201 let len = self.len;
202 if self.children.is_empty() {
203 let values = self.leaf.expect("variant leaf value present");
204 wrap_typed_value(len, values)
205 } else {
206 let typed_struct = self.into_struct_array() as ArrayRef;
207 wrap_typed_value(len, typed_struct)
208 }
209 }
210}
211
212fn wrap_typed_value(len: usize, values: ArrayRef) -> ArrayRef {
213 let placeholder = Arc::new(BinaryViewArray::new_null(len)) as ArrayRef;
214 Arc::new(StructArray::new(
215 Fields::from(vec![
216 Arc::new(Field::new("value", DataType::BinaryView, true)),
217 Arc::new(Field::new("typed_value", values.data_type().clone(), true)),
218 ]),
219 vec![placeholder, values],
220 None,
221 )) as ArrayRef
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use arrow::array::{Int64Array, StringArray};
228 use arrow_schema::DataType;
229
230 use crate::liquid_array::{LiquidByteArray, LiquidPrimitiveArray};
231
232 #[test]
233 fn to_arrow_array_with_paths_prunes_extra_fields() {
234 let did_arrow = StringArray::from(vec![Some("d")]);
236 let (_comp, did_liquid) = LiquidByteArray::train_from_arrow(&did_arrow);
237 let did_liquid: LiquidArrayRef = Arc::new(did_liquid);
238
239 let time_arrow = Int64Array::from(vec![1_i64]);
240 let time_liquid =
241 LiquidPrimitiveArray::<arrow::datatypes::Int64Type>::from_arrow_array(time_arrow);
242 let time_liquid: LiquidArrayRef = Arc::new(time_liquid);
243
244 let squeezed = VariantStructSqueezedArray::new(
245 vec![
246 (Arc::from("did"), did_liquid),
247 (Arc::from("time_us"), time_liquid),
248 ],
249 None,
250 DataType::Struct(Fields::from(Vec::<Arc<Field>>::new())),
251 );
252
253 let array = squeezed
255 .to_arrow_array_with_paths(["time_us"])
256 .expect("arrow array");
257 let root = array
258 .as_any()
259 .downcast_ref::<StructArray>()
260 .expect("struct root");
261 let typed_value = root
262 .column_by_name("typed_value")
263 .unwrap()
264 .as_any()
265 .downcast_ref::<StructArray>()
266 .unwrap();
267 let field_names: Vec<_> = typed_value
268 .fields()
269 .iter()
270 .map(|f| f.name().clone())
271 .collect();
272 assert_eq!(field_names, vec!["time_us"]);
273 }
274}