liquid_cache_parquet/optimizers/
mod.rs

1//! Optimizers for the Parquet module
2
3mod date_extract_opt;
4
5use std::{collections::HashMap, sync::Arc};
6
7use arrow_schema::{Field, Schema};
8use datafusion::{
9    catalog::memory::DataSourceExec,
10    common::tree_node::{Transformed, TreeNode, TreeNodeRecursion},
11    config::ConfigOptions,
12    datasource::{physical_plan::ParquetSource, source::DataSource},
13    physical_optimizer::PhysicalOptimizerRule,
14    physical_plan::ExecutionPlan,
15};
16pub use date_extract_opt::DateExtractOptimizer;
17
18use crate::{
19    LiquidCacheRef, LiquidParquetSource, optimizers::date_extract_opt::metadata_from_factory,
20};
21
22const DATE_MAPPING_METADATA_KEY: &str = "liquid.cache.date_mapping";
23
24/// Physical optimizer rule for local mode liquid cache
25///
26/// This optimizer rewrites DataSourceExec nodes that read Parquet files
27/// to use LiquidParquetSource instead of the default ParquetSource
28#[derive(Debug)]
29pub struct LocalModeOptimizer {
30    cache: LiquidCacheRef,
31}
32
33impl LocalModeOptimizer {
34    /// Create an optimizer with an existing cache instance
35    pub fn with_cache(cache: LiquidCacheRef) -> Self {
36        Self { cache }
37    }
38}
39
40impl PhysicalOptimizerRule for LocalModeOptimizer {
41    fn optimize(
42        &self,
43        plan: Arc<dyn ExecutionPlan>,
44        _config: &ConfigOptions,
45    ) -> Result<Arc<dyn ExecutionPlan>, datafusion::error::DataFusionError> {
46        Ok(rewrite_data_source_plan(plan, &self.cache))
47    }
48
49    fn name(&self) -> &str {
50        "LocalModeLiquidCacheOptimizer"
51    }
52
53    fn schema_check(&self) -> bool {
54        true
55    }
56}
57
58/// Rewrite the data source plan to use liquid cache.
59pub fn rewrite_data_source_plan(
60    plan: Arc<dyn ExecutionPlan>,
61    cache: &LiquidCacheRef,
62) -> Arc<dyn ExecutionPlan> {
63    let rewritten = plan
64        .transform_up(|node| {
65            let any_plan = node.as_any();
66            if let Some(data_source_exec) = any_plan.downcast_ref::<DataSourceExec>() {
67                if let Some((file_scan_config, parquet_source)) =
68                    data_source_exec.downcast_to_file_source::<ParquetSource>()
69                {
70                    let mut new_config = file_scan_config.clone();
71                    if let Some(schema_factory) =
72                        file_scan_config.file_source().schema_adapter_factory()
73                    {
74                        let file_schema = file_scan_config.file_schema.clone();
75                        let mut new_fields = vec![];
76                        for field in file_schema.fields() {
77                            let date_metadata =
78                                metadata_from_factory(&schema_factory, field.name());
79                            if let Some(date_metadata) = date_metadata {
80                                let new_field =
81                                    Field::clone(field.as_ref()).with_metadata(HashMap::from([(
82                                        DATE_MAPPING_METADATA_KEY.to_string(),
83                                        date_metadata,
84                                    )]));
85                                new_fields.push(Arc::new(new_field));
86                            } else {
87                                let new_field = field.clone();
88                                new_fields.push(new_field);
89                            }
90                        }
91                        let new_schema = Schema::new(new_fields);
92                        new_config.file_schema = Arc::new(new_schema);
93                    }
94                    let new_source = LiquidParquetSource::from_parquet_source(
95                        parquet_source.clone(),
96                        file_scan_config.file_schema.clone(),
97                        cache.clone(),
98                    );
99                    new_config.file_source = Arc::new(new_source);
100                    let new_file_source: Arc<dyn DataSource> = Arc::new(new_config);
101                    let new_plan = Arc::new(DataSourceExec::new(new_file_source));
102
103                    return Ok(Transformed::new(
104                        new_plan,
105                        true,
106                        TreeNodeRecursion::Continue,
107                    ));
108                }
109
110                return Ok(Transformed::no(node));
111            }
112            Ok(Transformed::no(node))
113        })
114        .unwrap();
115    rewritten.data
116}
117
118#[cfg(test)]
119mod tests {
120    use std::path::PathBuf;
121
122    use datafusion::{datasource::physical_plan::FileScanConfig, prelude::SessionContext};
123    use liquid_cache_storage::{
124        cache::squeeze_policies::TranscodeSqueezeEvict, cache_policies::LiquidPolicy,
125    };
126
127    use crate::LiquidCache;
128    use liquid_cache_common::IoMode;
129
130    use super::*;
131
132    fn rewrite_plan_inner(plan: Arc<dyn ExecutionPlan>) {
133        let expected_schema = plan.schema();
134        let liquid_cache = Arc::new(LiquidCache::new(
135            8192,
136            1000000,
137            PathBuf::from("test"),
138            Box::new(LiquidPolicy::new()),
139            Box::new(TranscodeSqueezeEvict),
140            IoMode::Uring,
141        ));
142        let rewritten = rewrite_data_source_plan(plan, &liquid_cache);
143
144        rewritten
145            .apply(|node| {
146                if let Some(plan) = node.as_any().downcast_ref::<DataSourceExec>() {
147                    let data_source = plan.data_source();
148                    let any_source = data_source.as_any();
149                    let source = any_source.downcast_ref::<FileScanConfig>().unwrap();
150                    let file_source = source.file_source();
151                    let any_file_source = file_source.as_any();
152                    let _parquet_source = any_file_source
153                        .downcast_ref::<LiquidParquetSource>()
154                        .unwrap();
155                    let schema = source.file_schema.as_ref();
156                    assert_eq!(schema, expected_schema.as_ref());
157                }
158                Ok(TreeNodeRecursion::Continue)
159            })
160            .unwrap();
161    }
162
163    #[tokio::test]
164    async fn test_plan_rewrite() {
165        let ctx = SessionContext::new();
166        ctx.register_parquet(
167            "nano_hits",
168            "../../examples/nano_hits.parquet",
169            Default::default(),
170        )
171        .await
172        .unwrap();
173        let df = ctx
174            .sql("SELECT * FROM nano_hits WHERE \"URL\" like 'https://%' limit 10")
175            .await
176            .unwrap();
177        let plan = df.create_physical_plan().await.unwrap();
178        rewrite_plan_inner(plan.clone());
179    }
180}