Skip to main content

liquid_cache_datafusion/optimizers/
mod.rs

1//! Optimizers for the Parquet module
2
3mod lineage_opt;
4
5use std::{str::FromStr, sync::Arc};
6
7use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
8use datafusion::{
9    catalog::memory::DataSourceExec,
10    common::tree_node::{Transformed, TreeNode, TreeNodeRecursion},
11    config::ConfigOptions,
12    datasource::{
13        physical_plan::{FileSource, ParquetSource},
14        source::DataSource,
15        table_schema::TableSchema,
16    },
17    physical_expr_adapter::PhysicalExprAdapterFactory,
18    physical_optimizer::PhysicalOptimizerRule,
19    physical_plan::ExecutionPlan,
20};
21pub use lineage_opt::LineageOptimizer;
22pub(crate) use lineage_opt::VariantField;
23
24use crate::{
25    LiquidCacheParquetRef, LiquidParquetSource,
26    optimizers::lineage_opt::{ColumnAnnotation, metadata_from_factory, serialize_date_part},
27};
28use liquid_cache::utils::VariantSchema;
29use serde::{Deserialize, Serialize};
30
31pub(crate) const DATE_MAPPING_METADATA_KEY: &str = "liquid.cache.date_mapping";
32pub(crate) const VARIANT_MAPPING_METADATA_KEY: &str = "liquid.cache.variant_path";
33pub(crate) const VARIANT_MAPPING_TYPE_METADATA_KEY: &str = "liquid.cache.variant_type";
34pub(crate) const STRING_FINGERPRINT_METADATA_KEY: &str = "liquid.cache.string_fingerprint";
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37struct VariantMappingSerdeEntry {
38    path: String,
39    #[serde(rename = "type", skip_serializing_if = "Option::is_none")]
40    data_type: Option<String>,
41}
42
43pub(crate) fn serialize_variant_mappings(fields: &[VariantField]) -> Option<String> {
44    if fields.is_empty() {
45        return None;
46    }
47
48    let entries: Vec<VariantMappingSerdeEntry> = fields
49        .iter()
50        .map(|field| VariantMappingSerdeEntry {
51            path: field.path.clone(),
52            data_type: field
53                .data_type
54                .as_ref()
55                .map(|data_type| data_type.to_string()),
56        })
57        .collect();
58
59    serde_json::to_string(&entries).ok()
60}
61
62fn deserialize_variant_mappings(raw: &str) -> Option<Vec<VariantField>> {
63    let entries: Vec<VariantMappingSerdeEntry> = serde_json::from_str(raw).ok()?;
64    let mut fields = Vec::with_capacity(entries.len());
65    for entry in entries {
66        let data_type = match entry.data_type {
67            Some(spec) => Some(DataType::from_str(&spec).ok()?),
68            None => None,
69        };
70        fields.push(VariantField {
71            path: entry.path,
72            data_type,
73        });
74    }
75    Some(fields)
76}
77
78pub(crate) fn variant_mappings_from_field(field: &Field) -> Option<Vec<VariantField>> {
79    let metadata = field.metadata();
80    let raw = metadata.get(VARIANT_MAPPING_METADATA_KEY)?;
81    if let Some(parsed) = deserialize_variant_mappings(raw) {
82        return Some(parsed);
83    }
84
85    let data_type = metadata
86        .get(VARIANT_MAPPING_TYPE_METADATA_KEY)
87        .and_then(|spec| DataType::from_str(spec).ok());
88
89    Some(vec![VariantField {
90        path: raw.clone(),
91        data_type,
92    }])
93}
94
95/// Physical optimizer rule for local mode liquid cache
96///
97/// This optimizer rewrites DataSourceExec nodes that read Parquet files
98/// to use LiquidParquetSource instead of the default ParquetSource
99#[derive(Debug)]
100pub struct LocalModeOptimizer {
101    cache: LiquidCacheParquetRef,
102    eager_shredding: bool,
103}
104
105impl LocalModeOptimizer {
106    /// Create an optimizer with an existing cache instance
107    pub fn new(cache: LiquidCacheParquetRef, eager_shredding: bool) -> Self {
108        Self {
109            cache,
110            eager_shredding,
111        }
112    }
113
114    /// Create an optimizer with an existing cache instance
115    pub fn with_cache(cache: LiquidCacheParquetRef) -> Self {
116        Self {
117            cache,
118            eager_shredding: true,
119        }
120    }
121}
122
123impl PhysicalOptimizerRule for LocalModeOptimizer {
124    fn optimize(
125        &self,
126        plan: Arc<dyn ExecutionPlan>,
127        _config: &ConfigOptions,
128    ) -> Result<Arc<dyn ExecutionPlan>, datafusion::error::DataFusionError> {
129        Ok(rewrite_data_source_plan(
130            plan,
131            &self.cache,
132            self.eager_shredding,
133        ))
134    }
135
136    fn name(&self) -> &str {
137        "LocalModeLiquidCacheOptimizer"
138    }
139
140    fn schema_check(&self) -> bool {
141        // We deliberately enrich scan schemas with metadata describing variant/date
142        // extractions, so allow the optimizer to adjust schema metadata.
143        false
144    }
145}
146
147/// Rewrite the data source plan to use liquid cache.
148pub fn rewrite_data_source_plan(
149    plan: Arc<dyn ExecutionPlan>,
150    cache: &LiquidCacheParquetRef,
151    eager_shredding: bool,
152) -> Arc<dyn ExecutionPlan> {
153    let rewritten = plan
154        .transform_up(|node| try_optimize_parquet_source(node, cache, eager_shredding))
155        .unwrap();
156    rewritten.data
157}
158
159fn try_optimize_parquet_source(
160    plan: Arc<dyn ExecutionPlan>,
161    cache: &LiquidCacheParquetRef,
162    eager_shredding: bool,
163) -> Result<Transformed<Arc<dyn ExecutionPlan>>, datafusion::error::DataFusionError> {
164    let any_plan = plan.as_any();
165    if let Some(data_source_exec) = any_plan.downcast_ref::<DataSourceExec>()
166        && let Some((file_scan_config, parquet_source)) =
167            data_source_exec.downcast_to_file_source::<ParquetSource>()
168    {
169        let mut new_config = file_scan_config.clone();
170
171        let mut new_source =
172            LiquidParquetSource::from_parquet_source(parquet_source.clone(), cache.clone());
173        if let Some(expr_adapter_factory) = file_scan_config.expr_adapter_factory.as_ref() {
174            let new_schema = enrich_source_schema(
175                file_scan_config.file_schema(),
176                expr_adapter_factory,
177                eager_shredding,
178            );
179            let table_partition_cols = new_source.table_schema().table_partition_cols();
180            let new_table_schema =
181                TableSchema::new(Arc::new(new_schema), table_partition_cols.clone());
182            new_source = new_source.with_table_schema(new_table_schema);
183        }
184
185        new_config.file_source = Arc::new(new_source);
186        let new_file_source: Arc<dyn DataSource> = Arc::new(new_config);
187        let new_plan = Arc::new(DataSourceExec::new(new_file_source));
188
189        return Ok(Transformed::new(
190            new_plan,
191            true,
192            TreeNodeRecursion::Continue,
193        ));
194    }
195    Ok(Transformed::no(plan))
196}
197
198fn enrich_source_schema(
199    file_schema: &SchemaRef,
200    expr_adapter_factory: &Arc<dyn PhysicalExprAdapterFactory>,
201    eager_shredding: bool,
202) -> Schema {
203    let mut new_fields = vec![];
204    for field in file_schema.fields() {
205        if let Some(annotation) = metadata_from_factory(expr_adapter_factory, field.name()) {
206            new_fields.push(process_field_annotation(field, annotation, eager_shredding));
207        } else {
208            new_fields.push(field.clone());
209        }
210    }
211    Schema::new(new_fields)
212}
213
214fn process_field_annotation(
215    field: &Arc<Field>,
216    annotation: ColumnAnnotation,
217    eager_shredding: bool,
218) -> Arc<Field> {
219    let mut field_metadata = field.metadata().clone();
220    let mut updated_field = Field::clone(field.as_ref());
221    match annotation {
222        ColumnAnnotation::DatePart(unit) => {
223            field_metadata.insert(
224                DATE_MAPPING_METADATA_KEY.to_string(),
225                serialize_date_part(&unit),
226            );
227        }
228        ColumnAnnotation::VariantPaths(paths) => {
229            if eager_shredding {
230                if let Some(serialized) = serialize_variant_mappings(&paths) {
231                    field_metadata.insert(VARIANT_MAPPING_METADATA_KEY.to_string(), serialized);
232                }
233                updated_field = enrich_variant_field_type(&updated_field, &paths);
234            }
235        }
236        ColumnAnnotation::SubstringSearch => {
237            field_metadata.insert(
238                STRING_FINGERPRINT_METADATA_KEY.to_string(),
239                "substring".into(),
240            );
241        }
242    }
243    Arc::new(updated_field.with_metadata(field_metadata))
244}
245
246pub(crate) fn enrich_variant_field_type(field: &Field, fields: &[VariantField]) -> Field {
247    let typed_specs: Vec<&VariantField> = fields
248        .iter()
249        .filter(|field| field.data_type.is_some())
250        .collect();
251    if typed_specs.is_empty() {
252        return Field::clone(field);
253    }
254
255    let new_type = match field.data_type() {
256        DataType::Struct(children) => {
257            let mut rewritten = Vec::with_capacity(children.len() + 1);
258            let mut replaced = false;
259            for child in children.iter() {
260                if child.name() == "typed_value" {
261                    rewritten.push(build_variant_typed_value_field(
262                        Some(child.as_ref()),
263                        &typed_specs,
264                    ));
265                    replaced = true;
266                } else {
267                    let mut child_field = child.as_ref().clone();
268                    if child_field.name() == "value" {
269                        child_field =
270                            Field::new(child_field.name(), child_field.data_type().clone(), true)
271                                .with_metadata(child_field.metadata().clone());
272                    }
273                    rewritten.push(Arc::new(child_field));
274                }
275            }
276            if !replaced {
277                rewritten.push(build_variant_typed_value_field(None, &typed_specs));
278            }
279            DataType::Struct(Fields::from(rewritten))
280        }
281        other => other.clone(),
282    };
283    Field::clone(field).with_data_type(new_type)
284}
285
286pub(crate) fn enrich_schema_for_cache(schema: &SchemaRef) -> SchemaRef {
287    let mut fields = vec![];
288    for field in schema.fields() {
289        let new_field = if let Some(mappings) = variant_mappings_from_field(field.as_ref()) {
290            Arc::new(enrich_variant_field_type(field.as_ref(), &mappings))
291        } else {
292            field.clone()
293        };
294        fields.push(new_field);
295    }
296    Arc::new(Schema::new(fields))
297}
298
299fn build_variant_typed_value_field(
300    existing: Option<&Field>,
301    specs: &[&VariantField],
302) -> Arc<Field> {
303    let mut schema = VariantSchema::new(existing);
304    for spec in specs {
305        if let Some(data_type) = spec.data_type.as_ref() {
306            schema.insert_path(&spec.path, data_type);
307        }
308    }
309
310    Arc::new(Field::new(
311        "typed_value",
312        DataType::Struct(Fields::from(schema.typed_fields())),
313        true,
314    ))
315}
316
317#[cfg(test)]
318mod tests {
319    use std::path::PathBuf;
320
321    use datafusion::{datasource::physical_plan::FileScanConfig, prelude::SessionContext};
322    use liquid_cache::{
323        cache::{AlwaysHydrate, squeeze_policies::TranscodeSqueezeEvict},
324        cache_policies::LiquidPolicy,
325    };
326
327    use crate::LiquidCacheParquet;
328    use liquid_cache_common::IoMode;
329
330    use super::*;
331
332    fn rewrite_plan_inner(plan: Arc<dyn ExecutionPlan>) {
333        let expected_schema = plan.schema();
334        let liquid_cache = Arc::new(LiquidCacheParquet::new(
335            8192,
336            1000000,
337            PathBuf::from("test"),
338            Box::new(LiquidPolicy::new()),
339            Box::new(TranscodeSqueezeEvict),
340            Box::new(AlwaysHydrate::new()),
341            IoMode::Uring,
342        ));
343        let rewritten = rewrite_data_source_plan(plan, &liquid_cache, true);
344
345        rewritten
346            .apply(|node| {
347                if let Some(plan) = node.as_any().downcast_ref::<DataSourceExec>() {
348                    let data_source = plan.data_source();
349                    let any_source = data_source.as_any();
350                    let source = any_source.downcast_ref::<FileScanConfig>().unwrap();
351                    let file_source = source.file_source();
352                    let any_file_source = file_source.as_any();
353                    let _parquet_source = any_file_source
354                        .downcast_ref::<LiquidParquetSource>()
355                        .unwrap();
356                    let schema = source.file_schema().as_ref();
357                    assert_eq!(schema, expected_schema.as_ref());
358                }
359                Ok(TreeNodeRecursion::Continue)
360            })
361            .unwrap();
362    }
363
364    #[tokio::test]
365    async fn test_plan_rewrite() {
366        let ctx = SessionContext::new();
367        ctx.register_parquet(
368            "nano_hits",
369            "../../examples/nano_hits.parquet",
370            Default::default(),
371        )
372        .await
373        .unwrap();
374        let df = ctx
375            .sql("SELECT * FROM nano_hits WHERE \"URL\" like 'https://%' limit 10")
376            .await
377            .unwrap();
378        let plan = df.create_physical_plan().await.unwrap();
379        rewrite_plan_inner(plan.clone());
380    }
381}