liquid_cache_datafusion/cache/
column.rs1use arrow::{
2 array::{Array, ArrayRef, AsArray, BooleanArray},
3 buffer::BooleanBuffer,
4 compute::prep_null_mask_filter,
5 record_batch::RecordBatch,
6};
7use arrow_schema::{ArrowError, DataType, Field, Schema};
8use liquid_cache::cache::{CacheExpression, LiquidCache};
9use liquid_cache::utils::VariantSchema;
10use liquid_cache::utils::typed_struct_contains_path;
11use parquet::arrow::arrow_reader::ArrowPredicate;
12use parquet_variant_compute::{VariantArray, VariantType, shred_variant, unshred_variant};
13
14use crate::{
15 LiquidPredicate,
16 cache::{BatchID, ColumnAccessPath, ParquetArrayID},
17 optimizers::{
18 DATE_MAPPING_METADATA_KEY, STRING_FINGERPRINT_METADATA_KEY, variant_mappings_from_field,
19 },
20};
21use std::sync::Arc;
22
23#[derive(Debug)]
25pub struct CachedColumn {
26 cache_store: Arc<LiquidCache>,
27 field: Arc<Field>,
28 column_path: ColumnAccessPath,
29 expression: Option<Arc<CacheExpression>>,
30}
31
32pub type CachedColumnRef = Arc<CachedColumn>;
34
35fn infer_expression(field: &Field) -> Option<CacheExpression> {
36 if let Some(mapping) = field.metadata().get(DATE_MAPPING_METADATA_KEY)
37 && matches!(
38 field.data_type(),
39 DataType::Date32 | DataType::Timestamp(_, _)
40 )
41 && let Some(expr) = CacheExpression::try_from_date_part_str(mapping)
42 {
43 return Some(expr);
44 }
45 if field
46 .metadata()
47 .contains_key(STRING_FINGERPRINT_METADATA_KEY)
48 && is_string_type(field.data_type())
49 {
50 return Some(CacheExpression::substring_search());
51 }
52 if field.try_extension_type::<VariantType>().is_ok()
53 && let Some(mappings) = variant_mappings_from_field(field)
54 {
55 let typed_specs: Vec<_> = mappings
56 .into_iter()
57 .filter_map(|mapping| mapping.data_type.map(|data_type| (mapping.path, data_type)))
58 .collect();
59 if !typed_specs.is_empty() {
60 return Some(CacheExpression::variant_get_many(typed_specs));
61 }
62 }
63 None
64}
65
66#[derive(Debug)]
68pub enum InsertArrowArrayError {
69 AlreadyCached,
71}
72
73impl CachedColumn {
74 pub(crate) fn new(
75 field: Arc<Field>,
76 cache_store: Arc<LiquidCache>,
77 column_access_path: ColumnAccessPath,
78 is_predicate_column: bool,
79 ) -> Self {
80 column_access_path.initialize_dir(cache_store.config().cache_root_dir());
81
82 let expression = infer_expression(field.as_ref()).map(Arc::new);
83 if let Some(expr) = expression.as_ref() {
84 let hint_entry_id = column_access_path.entry_id(BatchID::from_raw(0)).into();
85 cache_store.add_squeeze_hint(&hint_entry_id, expr.clone());
86 } else if is_predicate_column {
87 let hint_entry_id = column_access_path.entry_id(BatchID::from_raw(0)).into();
88 cache_store
89 .add_squeeze_hint(&hint_entry_id, Arc::new(CacheExpression::PredicateColumn));
90 }
91 Self {
92 field,
93 cache_store,
94 column_path: column_access_path,
95 expression,
96 }
97 }
98
99 pub(crate) fn entry_id(&self, batch_id: BatchID) -> ParquetArrayID {
101 self.column_path.entry_id(batch_id)
102 }
103
104 pub(crate) fn is_cached(&self, batch_id: BatchID) -> bool {
105 self.cache_store.is_cached(&self.entry_id(batch_id).into())
106 }
107
108 pub fn field(&self) -> Arc<Field> {
110 self.field.clone()
111 }
112
113 pub fn expression(&self) -> Option<Arc<CacheExpression>> {
115 self.expression.clone()
116 }
117
118 fn array_to_record_batch(&self, array: ArrayRef) -> RecordBatch {
119 let schema = Arc::new(Schema::new(vec![self.field.clone()]));
120 RecordBatch::try_new(schema, vec![array]).unwrap()
121 }
122
123 pub async fn eval_predicate_with_filter(
125 &self,
126 batch_id: BatchID,
127 filter: &BooleanBuffer,
128 predicate: &mut LiquidPredicate,
129 ) -> Option<Result<BooleanArray, ArrowError>> {
130 let entry_id = self.entry_id(batch_id).into();
131 let result = self
132 .cache_store
133 .eval_predicate(&entry_id, predicate.physical_expr_physical_column_index())
134 .with_selection(filter)
135 .await?;
136 match result {
137 Ok(boolean_array) => {
138 let predicate_filter = match boolean_array.null_count() {
139 0 => boolean_array,
140 _ => prep_null_mask_filter(&boolean_array),
141 };
142 Some(Ok(predicate_filter))
143 }
144 Err(array) => {
145 let mut array = array;
146 if let Some(transformed) = maybe_shred_variant_array(&array, self.field.as_ref()) {
147 array = transformed;
148 }
149 let record_batch = self.array_to_record_batch(array);
150 let boolean_array = match predicate.evaluate(record_batch) {
151 Ok(arr) => arr,
152 Err(err) => return Some(Err(err)),
153 };
154 let predicate_filter = match boolean_array.null_count() {
155 0 => boolean_array,
156 _ => prep_null_mask_filter(&boolean_array),
157 };
158 Some(Ok(predicate_filter))
159 }
160 }
161 }
162
163 pub async fn get_arrow_array_with_filter(
165 &self,
166 batch_id: BatchID,
167 filter: &BooleanBuffer,
168 ) -> Option<ArrayRef> {
169 let entry_id = self.entry_id(batch_id).into();
170 let mut array = self
171 .cache_store
172 .get(&entry_id)
173 .with_selection(filter)
174 .with_optional_expression_hint(self.expression())
175 .read()
176 .await?;
177 if let Some(transformed) = maybe_shred_variant_array(&array, self.field.as_ref()) {
178 array = transformed;
179 }
180 Some(array)
181 }
182
183 #[cfg(test)]
184 pub(crate) async fn get_arrow_array_test_only(&self, batch_id: BatchID) -> Option<ArrayRef> {
185 let entry_id = self.entry_id(batch_id).into();
186 self.cache_store.get(&entry_id).await
187 }
188
189 pub async fn insert(
191 self: &Arc<Self>,
192 batch_id: BatchID,
193 array: ArrayRef,
194 ) -> Result<(), InsertArrowArrayError> {
195 if self.is_cached(batch_id) {
196 return Err(InsertArrowArrayError::AlreadyCached);
197 }
198
199 let mut array = array;
200 if let Some(transformed) = maybe_shred_variant_array(&array, self.field.as_ref()) {
201 array = transformed;
202 }
203 self.cache_store
204 .insert(self.entry_id(batch_id).into(), array)
205 .await;
206 Ok(())
207 }
208}
209
210fn maybe_shred_variant_array(array: &ArrayRef, field: &Field) -> Option<ArrayRef> {
211 let mappings = variant_mappings_from_field(field)?;
212 let typed_specs: Vec<(String, DataType)> = mappings
213 .into_iter()
214 .filter_map(|mapping| mapping.data_type.map(|data_type| (mapping.path, data_type)))
215 .collect();
216 if typed_specs.is_empty() {
217 return None;
218 }
219 shred_variant_array(array, field, &typed_specs)
220}
221
222fn shred_variant_array(
223 array: &ArrayRef,
224 field: &Field,
225 specs: &[(String, DataType)],
226) -> Option<ArrayRef> {
227 if specs.is_empty() {
228 return None;
229 }
230
231 let variant_array = VariantArray::try_new(array.as_ref()).ok()?;
232 let missing_specs: Vec<_> = specs
233 .iter()
234 .filter(|(path, _)| !variant_contains_typed_field(&variant_array, path))
235 .collect();
236 if missing_specs.is_empty() {
237 return None;
238 }
239
240 let target_fields = match field.data_type() {
241 DataType::Struct(fields) => fields.clone(),
242 _ => return None,
243 };
244 let typed_schema = target_fields
245 .iter()
246 .find(|child| child.name() == "typed_value")
247 .cloned()?;
248 let mut schema = VariantSchema::new(Some(typed_schema.as_ref()));
249 for (path, data_type) in missing_specs {
250 schema.insert_path(path, data_type);
251 }
252 let shredding_schema = schema.shredding_type()?;
253 let unshredded = unshred_variant(&variant_array).ok()?;
254 let shredded = shred_variant(&unshredded, &shredding_schema).ok()?;
255 Some(Arc::new(shredded.into_inner()))
256}
257
258fn variant_contains_typed_field(array: &VariantArray, path: &str) -> bool {
259 let Some(typed_field) = array.typed_value_field() else {
260 return false;
261 };
262 let Some(typed_root) = typed_field.as_struct_opt() else {
263 return false;
264 };
265 typed_struct_contains_path(typed_root, path)
266}
267
268fn is_string_type(data_type: &DataType) -> bool {
269 match data_type {
270 DataType::Utf8 | DataType::Utf8View | DataType::LargeUtf8 => true,
271 DataType::Dictionary(_, value_type) => is_string_type(value_type.as_ref()),
272 _ => false,
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use crate::optimizers::{
280 VARIANT_MAPPING_METADATA_KEY, VariantField, enrich_variant_field_type,
281 };
282 use arrow::array::{ArrayRef, StringArray, StructArray};
283 use parquet::variant::{VariantType, json_to_variant};
284 use serde_json::json;
285 use std::collections::HashMap;
286
287 #[test]
288 fn shredding_adds_all_variant_paths() {
289 let values = StringArray::from(vec![
290 Some(r#"{"name":"Alice","age":30}"#),
291 Some(r#"{"name":"Bob","age":27}"#),
292 ]);
293 let variant = json_to_variant(&(Arc::new(values) as ArrayRef)).expect("variant");
294
295 let mut metadata = HashMap::new();
296 metadata.insert(
297 VARIANT_MAPPING_METADATA_KEY.to_string(),
298 serde_json::to_string(&vec![
299 json!({"path": "name", "type": "Utf8"}),
300 json!({"path": "age", "type": "Int64"}),
301 ])
302 .unwrap(),
303 );
304
305 let variant_fields = vec![
306 VariantField {
307 path: "name".to_string(),
308 data_type: Some(DataType::Utf8),
309 },
310 VariantField {
311 path: "age".to_string(),
312 data_type: Some(DataType::Int64),
313 },
314 ];
315
316 let base_field = Field::new("variant", variant.inner().data_type().clone(), true)
317 .with_extension_type(VariantType)
318 .with_metadata(metadata);
319 let enriched = enrich_variant_field_type(base_field.as_ref(), &variant_fields)
320 .with_metadata(base_field.metadata().clone());
321 let array: ArrayRef = ArrayRef::from(variant);
322
323 let shredded = maybe_shred_variant_array(&array, enriched.as_ref())
324 .expect("variant should be shredded");
325 let shredded_struct = shredded
326 .as_any()
327 .downcast_ref::<StructArray>()
328 .expect("struct array");
329 let typed_value = shredded_struct
330 .column_by_name("typed_value")
331 .expect("typed_value column");
332 let typed_struct = typed_value
333 .as_any()
334 .downcast_ref::<StructArray>()
335 .expect("typed struct");
336
337 let name_struct = typed_struct
338 .column_by_name("name")
339 .expect("name path")
340 .as_any()
341 .downcast_ref::<StructArray>()
342 .expect("name struct");
343 let name_values = name_struct
344 .column_by_name("typed_value")
345 .expect("name typed value");
346 assert_eq!(name_values.data_type(), &DataType::Utf8);
347
348 let age_struct = typed_struct
349 .column_by_name("age")
350 .expect("age path")
351 .as_any()
352 .downcast_ref::<StructArray>()
353 .expect("age struct");
354 let age_values = age_struct
355 .column_by_name("typed_value")
356 .expect("age typed value");
357 assert_eq!(age_values.data_type(), &DataType::Int64);
358 }
359}