liquid_cache_parquet/optimizers/
mod.rs1mod date_extract_opt;
4
5use std::{collections::HashMap, sync::Arc};
6
7use arrow_schema::{Field, Schema};
8use datafusion::{
9 catalog::memory::DataSourceExec,
10 common::tree_node::{Transformed, TreeNode, TreeNodeRecursion},
11 config::ConfigOptions,
12 datasource::{physical_plan::ParquetSource, source::DataSource},
13 physical_optimizer::PhysicalOptimizerRule,
14 physical_plan::ExecutionPlan,
15};
16pub use date_extract_opt::DateExtractOptimizer;
17
18use crate::{
19 LiquidCacheRef, LiquidParquetSource, optimizers::date_extract_opt::metadata_from_factory,
20};
21
22const DATE_MAPPING_METADATA_KEY: &str = "liquid.cache.date_mapping";
23
24#[derive(Debug)]
29pub struct LocalModeOptimizer {
30 cache: LiquidCacheRef,
31}
32
33impl LocalModeOptimizer {
34 pub fn with_cache(cache: LiquidCacheRef) -> Self {
36 Self { cache }
37 }
38}
39
40impl PhysicalOptimizerRule for LocalModeOptimizer {
41 fn optimize(
42 &self,
43 plan: Arc<dyn ExecutionPlan>,
44 _config: &ConfigOptions,
45 ) -> Result<Arc<dyn ExecutionPlan>, datafusion::error::DataFusionError> {
46 Ok(rewrite_data_source_plan(plan, &self.cache))
47 }
48
49 fn name(&self) -> &str {
50 "LocalModeLiquidCacheOptimizer"
51 }
52
53 fn schema_check(&self) -> bool {
54 true
55 }
56}
57
58pub fn rewrite_data_source_plan(
60 plan: Arc<dyn ExecutionPlan>,
61 cache: &LiquidCacheRef,
62) -> Arc<dyn ExecutionPlan> {
63 let rewritten = plan
64 .transform_up(|node| {
65 let any_plan = node.as_any();
66 if let Some(data_source_exec) = any_plan.downcast_ref::<DataSourceExec>() {
67 if let Some((file_scan_config, parquet_source)) =
68 data_source_exec.downcast_to_file_source::<ParquetSource>()
69 {
70 let mut new_config = file_scan_config.clone();
71 if let Some(schema_factory) =
72 file_scan_config.file_source().schema_adapter_factory()
73 {
74 let file_schema = file_scan_config.file_schema.clone();
75 let mut new_fields = vec![];
76 for field in file_schema.fields() {
77 let date_metadata =
78 metadata_from_factory(&schema_factory, field.name());
79 if let Some(date_metadata) = date_metadata {
80 let new_field =
81 Field::clone(field.as_ref()).with_metadata(HashMap::from([(
82 DATE_MAPPING_METADATA_KEY.to_string(),
83 date_metadata,
84 )]));
85 new_fields.push(Arc::new(new_field));
86 } else {
87 let new_field = field.clone();
88 new_fields.push(new_field);
89 }
90 }
91 let new_schema = Schema::new(new_fields);
92 new_config.file_schema = Arc::new(new_schema);
93 }
94 let new_source = LiquidParquetSource::from_parquet_source(
95 parquet_source.clone(),
96 file_scan_config.file_schema.clone(),
97 cache.clone(),
98 );
99 new_config.file_source = Arc::new(new_source);
100 let new_file_source: Arc<dyn DataSource> = Arc::new(new_config);
101 let new_plan = Arc::new(DataSourceExec::new(new_file_source));
102
103 return Ok(Transformed::new(
104 new_plan,
105 true,
106 TreeNodeRecursion::Continue,
107 ));
108 }
109
110 return Ok(Transformed::no(node));
111 }
112 Ok(Transformed::no(node))
113 })
114 .unwrap();
115 rewritten.data
116}
117
118#[cfg(test)]
119mod tests {
120 use std::path::PathBuf;
121
122 use datafusion::{datasource::physical_plan::FileScanConfig, prelude::SessionContext};
123 use liquid_cache_storage::{
124 cache::squeeze_policies::TranscodeSqueezeEvict, cache_policies::LiquidPolicy,
125 };
126
127 use crate::LiquidCache;
128 use liquid_cache_common::IoMode;
129
130 use super::*;
131
132 fn rewrite_plan_inner(plan: Arc<dyn ExecutionPlan>) {
133 let expected_schema = plan.schema();
134 let liquid_cache = Arc::new(LiquidCache::new(
135 8192,
136 1000000,
137 PathBuf::from("test"),
138 Box::new(LiquidPolicy::new()),
139 Box::new(TranscodeSqueezeEvict),
140 IoMode::Uring,
141 ));
142 let rewritten = rewrite_data_source_plan(plan, &liquid_cache);
143
144 rewritten
145 .apply(|node| {
146 if let Some(plan) = node.as_any().downcast_ref::<DataSourceExec>() {
147 let data_source = plan.data_source();
148 let any_source = data_source.as_any();
149 let source = any_source.downcast_ref::<FileScanConfig>().unwrap();
150 let file_source = source.file_source();
151 let any_file_source = file_source.as_any();
152 let _parquet_source = any_file_source
153 .downcast_ref::<LiquidParquetSource>()
154 .unwrap();
155 let schema = source.file_schema.as_ref();
156 assert_eq!(schema, expected_schema.as_ref());
157 }
158 Ok(TreeNodeRecursion::Continue)
159 })
160 .unwrap();
161 }
162
163 #[tokio::test]
164 async fn test_plan_rewrite() {
165 let ctx = SessionContext::new();
166 ctx.register_parquet(
167 "nano_hits",
168 "../../examples/nano_hits.parquet",
169 Default::default(),
170 )
171 .await
172 .unwrap();
173 let df = ctx
174 .sql("SELECT * FROM nano_hits WHERE \"URL\" like 'https://%' limit 10")
175 .await
176 .unwrap();
177 let plan = df.create_physical_plan().await.unwrap();
178 rewrite_plan_inner(plan.clone());
179 }
180}