liquid_cache/liquid_array/
variant_array.rs1use std::sync::Arc;
2
3use arrow::array::{Array, ArrayRef, BinaryViewArray, StructArray};
4use arrow::buffer::NullBuffer;
5use arrow_schema::{DataType, Field, Fields};
6
7use crate::liquid_array::{
8 LiquidArrayRef, LiquidDataType, LiquidSqueezedArray, NeedsBacking, SqueezedBacking,
9};
10use ahash::AHashMap;
11
12#[derive(Debug)]
14pub struct VariantStructSqueezedArray {
15 values: AHashMap<Arc<str>, LiquidArrayRef>,
16 len: usize,
17 nulls: Option<NullBuffer>,
18 original_arrow_type: DataType,
19 disk_backing_size: usize,
20}
21
22impl VariantStructSqueezedArray {
23 pub fn new(
25 values: Vec<(Arc<str>, LiquidArrayRef)>,
26 nulls: Option<NullBuffer>,
27 original_arrow_type: DataType,
28 disk_backing_size: usize,
29 ) -> Self {
30 let len = values.first().map(|(_, array)| array.len()).unwrap_or(0);
31 let mut map = AHashMap::with_capacity(values.len());
32 for (path, array) in values {
33 debug_assert_eq!(array.len(), len, "variant paths must share length");
34 map.insert(path, array);
35 }
36 Self {
37 values: map,
38 len,
39 nulls,
40 original_arrow_type,
41 disk_backing_size,
42 }
43 }
44
45 fn build_root_struct(&self) -> StructArray {
46 let metadata = Arc::new(BinaryViewArray::from(vec![b"" as &[u8]; self.len])) as ArrayRef;
47 let value_placeholder = Arc::new(BinaryViewArray::new_null(self.len)) as ArrayRef;
48 let typed_struct = self.build_typed_struct();
49
50 let metadata_field = Arc::new(Field::new("metadata", DataType::BinaryView, false));
51 let value_field = Arc::new(Field::new("value", DataType::BinaryView, true));
52 let typed_field = Arc::new(Field::new(
53 "typed_value",
54 typed_struct.data_type().clone(),
55 true,
56 ));
57
58 StructArray::new(
59 Fields::from(vec![metadata_field, value_field, typed_field]),
60 vec![metadata, value_placeholder, typed_struct as ArrayRef],
61 self.nulls.clone(),
62 )
63 }
64
65 fn build_typed_struct(&self) -> Arc<StructArray> {
66 let mut root = VariantTreeNode::new(self.len);
67 for (path, array) in &self.values {
68 let segments: Vec<&str> = path
69 .split('.')
70 .filter(|segment| !segment.is_empty())
71 .collect();
72 if segments.is_empty() {
73 continue;
74 }
75 root.insert(&segments, array.to_arrow_array());
76 }
77 root.into_struct_array()
78 }
79
80 pub fn contains_path(&self, path: &str) -> bool {
82 self.values.contains_key(path)
83 }
84
85 pub fn to_arrow_array_with_paths<'a>(
88 &self,
89 paths: impl IntoIterator<Item = &'a str>,
90 ) -> Result<ArrayRef, NeedsBacking> {
91 let mut filtered: Vec<(Arc<str>, LiquidArrayRef)> = Vec::new();
92 for path in paths.into_iter() {
93 if let Some(array) = self.values.get(path) {
94 filtered.push((Arc::from(path.to_string()), array.clone()));
95 }
96 }
97
98 if filtered.is_empty() {
99 return Ok(Arc::new(self.build_root_struct()) as ArrayRef);
100 }
101
102 let filtered = VariantStructSqueezedArray::new(
103 filtered,
104 self.nulls.clone(),
105 self.original_arrow_type.clone(),
106 self.disk_backing_size,
107 );
108 Ok(Arc::new(filtered.build_root_struct()) as ArrayRef)
109 }
110
111 pub fn typed_values(&self) -> Vec<(Arc<str>, LiquidArrayRef)> {
113 self.values
114 .iter()
115 .map(|(path, array)| (path.clone(), array.clone()))
116 .collect()
117 }
118
119 pub fn nulls(&self) -> Option<NullBuffer> {
121 self.nulls.clone()
122 }
123}
124
125#[async_trait::async_trait]
126impl LiquidSqueezedArray for VariantStructSqueezedArray {
127 fn as_any(&self) -> &dyn std::any::Any {
128 self
129 }
130
131 fn get_array_memory_size(&self) -> usize {
132 self.values
133 .values()
134 .map(|array| array.get_array_memory_size())
135 .sum()
136 }
137
138 fn len(&self) -> usize {
139 self.len
140 }
141
142 async fn to_arrow_array(&self) -> ArrayRef {
143 Arc::new(self.build_root_struct()) as ArrayRef
144 }
145
146 fn data_type(&self) -> LiquidDataType {
147 LiquidDataType::ByteViewArray
148 }
149
150 fn original_arrow_data_type(&self) -> DataType {
151 self.original_arrow_type.clone()
152 }
153
154 fn disk_backing(&self) -> SqueezedBacking {
155 SqueezedBacking::Arrow(self.disk_backing_size)
156 }
157}
158
159#[derive(Default)]
160struct VariantTreeNode {
161 len: usize,
162 leaf: Option<ArrayRef>,
163 children: AHashMap<String, VariantTreeNode>,
164}
165
166impl VariantTreeNode {
167 fn new(len: usize) -> Self {
168 Self {
169 len,
170 leaf: None,
171 children: AHashMap::new(),
172 }
173 }
174
175 fn insert(&mut self, segments: &[&str], values: ArrayRef) {
176 if segments.is_empty() {
177 self.leaf = Some(values);
178 return;
179 }
180 let (head, tail) = segments.split_first().unwrap();
181 self.children
182 .entry(head.to_string())
183 .or_insert_with(|| VariantTreeNode::new(self.len))
184 .insert(tail, values);
185 }
186
187 fn into_struct_array(self) -> Arc<StructArray> {
188 let mut fields = Vec::with_capacity(self.children.len());
189 let mut arrays = Vec::with_capacity(self.children.len());
190 let mut entries: Vec<_> = self.children.into_iter().collect();
191 entries.sort_by(|a, b| a.0.cmp(&b.0));
192 for (name, child) in entries {
193 let field_array = child.into_field_array();
194 fields.push(Arc::new(Field::new(
195 name.as_str(),
196 field_array.data_type().clone(),
197 false,
198 )));
199 arrays.push(field_array);
200 }
201 Arc::new(StructArray::new(Fields::from(fields), arrays, None))
202 }
203
204 fn into_field_array(self) -> ArrayRef {
205 let len = self.len;
206 if self.children.is_empty() {
207 let values = self.leaf.expect("variant leaf value present");
208 wrap_typed_value(len, values)
209 } else {
210 let typed_struct = self.into_struct_array() as ArrayRef;
211 wrap_typed_value(len, typed_struct)
212 }
213 }
214}
215
216fn wrap_typed_value(len: usize, values: ArrayRef) -> ArrayRef {
217 let placeholder = Arc::new(BinaryViewArray::new_null(len)) as ArrayRef;
218 Arc::new(StructArray::new(
219 Fields::from(vec![
220 Arc::new(Field::new("value", DataType::BinaryView, true)),
221 Arc::new(Field::new("typed_value", values.data_type().clone(), true)),
222 ]),
223 vec![placeholder, values],
224 None,
225 )) as ArrayRef
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use arrow::array::{Int64Array, StringArray};
232 use arrow_schema::DataType;
233
234 use crate::liquid_array::{LiquidByteViewArray, LiquidPrimitiveArray, raw::FsstArray};
235
236 #[test]
237 fn to_arrow_array_with_paths_prunes_extra_fields() {
238 let did_arrow = StringArray::from(vec![Some("d")]);
240 let (_comp, did_liquid) = LiquidByteViewArray::<FsstArray>::train_from_arrow(&did_arrow);
241 let did_liquid: LiquidArrayRef = Arc::new(did_liquid);
242
243 let time_arrow = Int64Array::from(vec![1_i64]);
244 let time_liquid =
245 LiquidPrimitiveArray::<arrow::datatypes::Int64Type>::from_arrow_array(time_arrow);
246 let time_liquid: LiquidArrayRef = Arc::new(time_liquid);
247
248 let squeezed = VariantStructSqueezedArray::new(
249 vec![
250 (Arc::from("did"), did_liquid),
251 (Arc::from("time_us"), time_liquid),
252 ],
253 None,
254 DataType::Struct(Fields::from(Vec::<Arc<Field>>::new())),
255 0,
256 );
257
258 let array = squeezed
260 .to_arrow_array_with_paths(["time_us"])
261 .expect("arrow array");
262 let root = array
263 .as_any()
264 .downcast_ref::<StructArray>()
265 .expect("struct root");
266 let typed_value = root
267 .column_by_name("typed_value")
268 .unwrap()
269 .as_any()
270 .downcast_ref::<StructArray>()
271 .unwrap();
272 let field_names: Vec<_> = typed_value
273 .fields()
274 .iter()
275 .map(|f| f.name().clone())
276 .collect();
277 assert_eq!(field_names, vec!["time_us"]);
278 }
279}