Skip to main content

liquid_cache_datafusion/cache/
column.rs

1use arrow::{
2    array::{Array, ArrayRef, AsArray, BooleanArray},
3    buffer::BooleanBuffer,
4    compute::prep_null_mask_filter,
5    record_batch::RecordBatch,
6};
7use arrow_schema::{ArrowError, DataType, Field, Schema};
8use liquid_cache::cache::{CacheExpression, LiquidCache};
9use liquid_cache::utils::VariantSchema;
10use liquid_cache::utils::typed_struct_contains_path;
11use parquet::arrow::arrow_reader::ArrowPredicate;
12use parquet_variant_compute::{VariantArray, VariantType, shred_variant, unshred_variant};
13
14use crate::{
15    LiquidPredicate,
16    cache::{BatchID, ColumnAccessPath, ParquetArrayID},
17    optimizers::{
18        DATE_MAPPING_METADATA_KEY, STRING_FINGERPRINT_METADATA_KEY, variant_mappings_from_field,
19    },
20};
21use std::sync::Arc;
22
23/// A column in the cache.
24#[derive(Debug)]
25pub struct CachedColumn {
26    cache_store: Arc<LiquidCache>,
27    field: Arc<Field>,
28    column_path: ColumnAccessPath,
29    expression: Option<Arc<CacheExpression>>,
30}
31
32/// A reference to a cached column.
33pub type CachedColumnRef = Arc<CachedColumn>;
34
35fn infer_expression(field: &Field) -> Option<CacheExpression> {
36    if let Some(mapping) = field.metadata().get(DATE_MAPPING_METADATA_KEY)
37        && matches!(
38            field.data_type(),
39            DataType::Date32 | DataType::Timestamp(_, _)
40        )
41        && let Some(expr) = CacheExpression::try_from_date_part_str(mapping)
42    {
43        return Some(expr);
44    }
45    if field
46        .metadata()
47        .contains_key(STRING_FINGERPRINT_METADATA_KEY)
48        && is_string_type(field.data_type())
49    {
50        return Some(CacheExpression::substring_search());
51    }
52    if field.try_extension_type::<VariantType>().is_ok()
53        && let Some(mappings) = variant_mappings_from_field(field)
54    {
55        let typed_specs: Vec<_> = mappings
56            .into_iter()
57            .filter_map(|mapping| mapping.data_type.map(|data_type| (mapping.path, data_type)))
58            .collect();
59        if !typed_specs.is_empty() {
60            return Some(CacheExpression::variant_get_many(typed_specs));
61        }
62    }
63    None
64}
65
66/// Error type for inserting an arrow array into the cache.
67#[derive(Debug)]
68pub enum InsertArrowArrayError {
69    /// The array is already cached.
70    AlreadyCached,
71}
72
73impl CachedColumn {
74    pub(crate) fn new(
75        field: Arc<Field>,
76        cache_store: Arc<LiquidCache>,
77        column_access_path: ColumnAccessPath,
78        is_predicate_column: bool,
79    ) -> Self {
80        column_access_path.initialize_dir(cache_store.config().cache_root_dir());
81
82        let expression = infer_expression(field.as_ref()).map(Arc::new);
83        if let Some(expr) = expression.as_ref() {
84            let hint_entry_id = column_access_path.entry_id(BatchID::from_raw(0)).into();
85            cache_store.add_squeeze_hint(&hint_entry_id, expr.clone());
86        } else if is_predicate_column {
87            let hint_entry_id = column_access_path.entry_id(BatchID::from_raw(0)).into();
88            cache_store
89                .add_squeeze_hint(&hint_entry_id, Arc::new(CacheExpression::PredicateColumn));
90        }
91        Self {
92            field,
93            cache_store,
94            column_path: column_access_path,
95            expression,
96        }
97    }
98
99    /// row_id must be on a batch boundary.
100    pub(crate) fn entry_id(&self, batch_id: BatchID) -> ParquetArrayID {
101        self.column_path.entry_id(batch_id)
102    }
103
104    pub(crate) fn is_cached(&self, batch_id: BatchID) -> bool {
105        self.cache_store.is_cached(&self.entry_id(batch_id).into())
106    }
107
108    /// Returns the Arrow field metadata for this cached column.
109    pub fn field(&self) -> Arc<Field> {
110        self.field.clone()
111    }
112
113    /// Returns the expression metadata associated with this column, if any.
114    pub fn expression(&self) -> Option<Arc<CacheExpression>> {
115        self.expression.clone()
116    }
117
118    fn array_to_record_batch(&self, array: ArrayRef) -> RecordBatch {
119        let schema = Arc::new(Schema::new(vec![self.field.clone()]));
120        RecordBatch::try_new(schema, vec![array]).unwrap()
121    }
122
123    /// Evaluates a predicate on a cached column.
124    pub async fn eval_predicate_with_filter(
125        &self,
126        batch_id: BatchID,
127        filter: &BooleanBuffer,
128        predicate: &mut LiquidPredicate,
129    ) -> Option<Result<BooleanArray, ArrowError>> {
130        let entry_id = self.entry_id(batch_id).into();
131        let result = self
132            .cache_store
133            .eval_predicate(&entry_id, predicate.physical_expr_physical_column_index())
134            .with_selection(filter)
135            .await?;
136        match result {
137            Ok(boolean_array) => {
138                let predicate_filter = match boolean_array.null_count() {
139                    0 => boolean_array,
140                    _ => prep_null_mask_filter(&boolean_array),
141                };
142                Some(Ok(predicate_filter))
143            }
144            Err(array) => {
145                let mut array = array;
146                if let Some(transformed) = maybe_shred_variant_array(&array, self.field.as_ref()) {
147                    array = transformed;
148                }
149                let record_batch = self.array_to_record_batch(array);
150                let boolean_array = match predicate.evaluate(record_batch) {
151                    Ok(arr) => arr,
152                    Err(err) => return Some(Err(err)),
153                };
154                let predicate_filter = match boolean_array.null_count() {
155                    0 => boolean_array,
156                    _ => prep_null_mask_filter(&boolean_array),
157                };
158                Some(Ok(predicate_filter))
159            }
160        }
161    }
162
163    /// Get an arrow array with a filter applied.
164    pub async fn get_arrow_array_with_filter(
165        &self,
166        batch_id: BatchID,
167        filter: &BooleanBuffer,
168    ) -> Option<ArrayRef> {
169        let entry_id = self.entry_id(batch_id).into();
170        let mut array = self
171            .cache_store
172            .get(&entry_id)
173            .with_selection(filter)
174            .with_optional_expression_hint(self.expression())
175            .read()
176            .await?;
177        if let Some(transformed) = maybe_shred_variant_array(&array, self.field.as_ref()) {
178            array = transformed;
179        }
180        Some(array)
181    }
182
183    #[cfg(test)]
184    pub(crate) async fn get_arrow_array_test_only(&self, batch_id: BatchID) -> Option<ArrayRef> {
185        let entry_id = self.entry_id(batch_id).into();
186        self.cache_store.get(&entry_id).await
187    }
188
189    /// Insert an array into the cache.
190    pub async fn insert(
191        self: &Arc<Self>,
192        batch_id: BatchID,
193        array: ArrayRef,
194    ) -> Result<(), InsertArrowArrayError> {
195        if self.is_cached(batch_id) {
196            return Err(InsertArrowArrayError::AlreadyCached);
197        }
198
199        let mut array = array;
200        if let Some(transformed) = maybe_shred_variant_array(&array, self.field.as_ref()) {
201            array = transformed;
202        }
203        self.cache_store
204            .insert(self.entry_id(batch_id).into(), array)
205            .await;
206        Ok(())
207    }
208}
209
210fn maybe_shred_variant_array(array: &ArrayRef, field: &Field) -> Option<ArrayRef> {
211    let mappings = variant_mappings_from_field(field)?;
212    let typed_specs: Vec<(String, DataType)> = mappings
213        .into_iter()
214        .filter_map(|mapping| mapping.data_type.map(|data_type| (mapping.path, data_type)))
215        .collect();
216    if typed_specs.is_empty() {
217        return None;
218    }
219    shred_variant_array(array, field, &typed_specs)
220}
221
222fn shred_variant_array(
223    array: &ArrayRef,
224    field: &Field,
225    specs: &[(String, DataType)],
226) -> Option<ArrayRef> {
227    if specs.is_empty() {
228        return None;
229    }
230
231    let variant_array = VariantArray::try_new(array.as_ref()).ok()?;
232    let missing_specs: Vec<_> = specs
233        .iter()
234        .filter(|(path, _)| !variant_contains_typed_field(&variant_array, path))
235        .collect();
236    if missing_specs.is_empty() {
237        return None;
238    }
239
240    let target_fields = match field.data_type() {
241        DataType::Struct(fields) => fields.clone(),
242        _ => return None,
243    };
244    let typed_schema = target_fields
245        .iter()
246        .find(|child| child.name() == "typed_value")
247        .cloned()?;
248    let mut schema = VariantSchema::new(Some(typed_schema.as_ref()));
249    for (path, data_type) in missing_specs {
250        schema.insert_path(path, data_type);
251    }
252    let shredding_schema = schema.shredding_type()?;
253    let unshredded = unshred_variant(&variant_array).ok()?;
254    let shredded = shred_variant(&unshredded, &shredding_schema).ok()?;
255    Some(Arc::new(shredded.into_inner()))
256}
257
258fn variant_contains_typed_field(array: &VariantArray, path: &str) -> bool {
259    let Some(typed_field) = array.typed_value_field() else {
260        return false;
261    };
262    let Some(typed_root) = typed_field.as_struct_opt() else {
263        return false;
264    };
265    typed_struct_contains_path(typed_root, path)
266}
267
268fn is_string_type(data_type: &DataType) -> bool {
269    match data_type {
270        DataType::Utf8 | DataType::Utf8View | DataType::LargeUtf8 => true,
271        DataType::Dictionary(_, value_type) => is_string_type(value_type.as_ref()),
272        _ => false,
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use crate::optimizers::{
280        VARIANT_MAPPING_METADATA_KEY, VariantField, enrich_variant_field_type,
281    };
282    use arrow::array::{ArrayRef, StringArray, StructArray};
283    use parquet::variant::{VariantType, json_to_variant};
284    use serde_json::json;
285    use std::collections::HashMap;
286
287    #[test]
288    fn shredding_adds_all_variant_paths() {
289        let values = StringArray::from(vec![
290            Some(r#"{"name":"Alice","age":30}"#),
291            Some(r#"{"name":"Bob","age":27}"#),
292        ]);
293        let variant = json_to_variant(&(Arc::new(values) as ArrayRef)).expect("variant");
294
295        let mut metadata = HashMap::new();
296        metadata.insert(
297            VARIANT_MAPPING_METADATA_KEY.to_string(),
298            serde_json::to_string(&vec![
299                json!({"path": "name", "type": "Utf8"}),
300                json!({"path": "age", "type": "Int64"}),
301            ])
302            .unwrap(),
303        );
304
305        let variant_fields = vec![
306            VariantField {
307                path: "name".to_string(),
308                data_type: Some(DataType::Utf8),
309            },
310            VariantField {
311                path: "age".to_string(),
312                data_type: Some(DataType::Int64),
313            },
314        ];
315
316        let base_field = Field::new("variant", variant.inner().data_type().clone(), true)
317            .with_extension_type(VariantType)
318            .with_metadata(metadata);
319        let enriched = enrich_variant_field_type(base_field.as_ref(), &variant_fields)
320            .with_metadata(base_field.metadata().clone());
321        let array: ArrayRef = ArrayRef::from(variant);
322
323        let shredded = maybe_shred_variant_array(&array, enriched.as_ref())
324            .expect("variant should be shredded");
325        let shredded_struct = shredded
326            .as_any()
327            .downcast_ref::<StructArray>()
328            .expect("struct array");
329        let typed_value = shredded_struct
330            .column_by_name("typed_value")
331            .expect("typed_value column");
332        let typed_struct = typed_value
333            .as_any()
334            .downcast_ref::<StructArray>()
335            .expect("typed struct");
336
337        let name_struct = typed_struct
338            .column_by_name("name")
339            .expect("name path")
340            .as_any()
341            .downcast_ref::<StructArray>()
342            .expect("name struct");
343        let name_values = name_struct
344            .column_by_name("typed_value")
345            .expect("name typed value");
346        assert_eq!(name_values.data_type(), &DataType::Utf8);
347
348        let age_struct = typed_struct
349            .column_by_name("age")
350            .expect("age path")
351            .as_any()
352            .downcast_ref::<StructArray>()
353            .expect("age struct");
354        let age_values = age_struct
355            .column_by_name("typed_value")
356            .expect("age typed value");
357        assert_eq!(age_values.data_type(), &DataType::Int64);
358    }
359}