liquid_cache_datafusion/optimizers/
mod.rs1mod lineage_opt;
4
5use std::{str::FromStr, sync::Arc};
6
7use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
8use datafusion::{
9 catalog::memory::DataSourceExec,
10 common::tree_node::{Transformed, TreeNode, TreeNodeRecursion},
11 config::ConfigOptions,
12 datasource::{
13 physical_plan::{FileSource, ParquetSource},
14 source::DataSource,
15 table_schema::TableSchema,
16 },
17 physical_expr_adapter::PhysicalExprAdapterFactory,
18 physical_optimizer::PhysicalOptimizerRule,
19 physical_plan::ExecutionPlan,
20};
21pub use lineage_opt::LineageOptimizer;
22pub(crate) use lineage_opt::VariantField;
23
24use crate::{
25 LiquidCacheParquetRef, LiquidParquetSource,
26 optimizers::lineage_opt::{ColumnAnnotation, metadata_from_factory, serialize_date_part},
27};
28use liquid_cache::utils::VariantSchema;
29use serde::{Deserialize, Serialize};
30
31pub(crate) const DATE_MAPPING_METADATA_KEY: &str = "liquid.cache.date_mapping";
32pub(crate) const VARIANT_MAPPING_METADATA_KEY: &str = "liquid.cache.variant_path";
33pub(crate) const VARIANT_MAPPING_TYPE_METADATA_KEY: &str = "liquid.cache.variant_type";
34pub(crate) const STRING_FINGERPRINT_METADATA_KEY: &str = "liquid.cache.string_fingerprint";
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37struct VariantMappingSerdeEntry {
38 path: String,
39 #[serde(rename = "type", skip_serializing_if = "Option::is_none")]
40 data_type: Option<String>,
41}
42
43pub(crate) fn serialize_variant_mappings(fields: &[VariantField]) -> Option<String> {
44 if fields.is_empty() {
45 return None;
46 }
47
48 let entries: Vec<VariantMappingSerdeEntry> = fields
49 .iter()
50 .map(|field| VariantMappingSerdeEntry {
51 path: field.path.clone(),
52 data_type: field
53 .data_type
54 .as_ref()
55 .map(|data_type| data_type.to_string()),
56 })
57 .collect();
58
59 serde_json::to_string(&entries).ok()
60}
61
62fn deserialize_variant_mappings(raw: &str) -> Option<Vec<VariantField>> {
63 let entries: Vec<VariantMappingSerdeEntry> = serde_json::from_str(raw).ok()?;
64 let mut fields = Vec::with_capacity(entries.len());
65 for entry in entries {
66 let data_type = match entry.data_type {
67 Some(spec) => Some(DataType::from_str(&spec).ok()?),
68 None => None,
69 };
70 fields.push(VariantField {
71 path: entry.path,
72 data_type,
73 });
74 }
75 Some(fields)
76}
77
78pub(crate) fn variant_mappings_from_field(field: &Field) -> Option<Vec<VariantField>> {
79 let metadata = field.metadata();
80 let raw = metadata.get(VARIANT_MAPPING_METADATA_KEY)?;
81 if let Some(parsed) = deserialize_variant_mappings(raw) {
82 return Some(parsed);
83 }
84
85 let data_type = metadata
86 .get(VARIANT_MAPPING_TYPE_METADATA_KEY)
87 .and_then(|spec| DataType::from_str(spec).ok());
88
89 Some(vec![VariantField {
90 path: raw.clone(),
91 data_type,
92 }])
93}
94
95#[derive(Debug)]
100pub struct LocalModeOptimizer {
101 cache: LiquidCacheParquetRef,
102 eager_shredding: bool,
103}
104
105impl LocalModeOptimizer {
106 pub fn new(cache: LiquidCacheParquetRef, eager_shredding: bool) -> Self {
108 Self {
109 cache,
110 eager_shredding,
111 }
112 }
113
114 pub fn with_cache(cache: LiquidCacheParquetRef) -> Self {
116 Self {
117 cache,
118 eager_shredding: true,
119 }
120 }
121}
122
123impl PhysicalOptimizerRule for LocalModeOptimizer {
124 fn optimize(
125 &self,
126 plan: Arc<dyn ExecutionPlan>,
127 _config: &ConfigOptions,
128 ) -> Result<Arc<dyn ExecutionPlan>, datafusion::error::DataFusionError> {
129 Ok(rewrite_data_source_plan(
130 plan,
131 &self.cache,
132 self.eager_shredding,
133 ))
134 }
135
136 fn name(&self) -> &str {
137 "LocalModeLiquidCacheOptimizer"
138 }
139
140 fn schema_check(&self) -> bool {
141 false
144 }
145}
146
147pub fn rewrite_data_source_plan(
149 plan: Arc<dyn ExecutionPlan>,
150 cache: &LiquidCacheParquetRef,
151 eager_shredding: bool,
152) -> Arc<dyn ExecutionPlan> {
153 let rewritten = plan
154 .transform_up(|node| try_optimize_parquet_source(node, cache, eager_shredding))
155 .unwrap();
156 rewritten.data
157}
158
159fn try_optimize_parquet_source(
160 plan: Arc<dyn ExecutionPlan>,
161 cache: &LiquidCacheParquetRef,
162 eager_shredding: bool,
163) -> Result<Transformed<Arc<dyn ExecutionPlan>>, datafusion::error::DataFusionError> {
164 let any_plan = plan.as_any();
165 if let Some(data_source_exec) = any_plan.downcast_ref::<DataSourceExec>()
166 && let Some((file_scan_config, parquet_source)) =
167 data_source_exec.downcast_to_file_source::<ParquetSource>()
168 {
169 let mut new_config = file_scan_config.clone();
170
171 let mut new_source =
172 LiquidParquetSource::from_parquet_source(parquet_source.clone(), cache.clone());
173 if let Some(expr_adapter_factory) = file_scan_config.expr_adapter_factory.as_ref() {
174 let new_schema = enrich_source_schema(
175 file_scan_config.file_schema(),
176 expr_adapter_factory,
177 eager_shredding,
178 );
179 let table_partition_cols = new_source.table_schema().table_partition_cols();
180 let new_table_schema =
181 TableSchema::new(Arc::new(new_schema), table_partition_cols.clone());
182 new_source = new_source.with_table_schema(new_table_schema);
183 }
184
185 new_config.file_source = Arc::new(new_source);
186 let new_file_source: Arc<dyn DataSource> = Arc::new(new_config);
187 let new_plan = Arc::new(DataSourceExec::new(new_file_source));
188
189 return Ok(Transformed::new(
190 new_plan,
191 true,
192 TreeNodeRecursion::Continue,
193 ));
194 }
195 Ok(Transformed::no(plan))
196}
197
198fn enrich_source_schema(
199 file_schema: &SchemaRef,
200 expr_adapter_factory: &Arc<dyn PhysicalExprAdapterFactory>,
201 eager_shredding: bool,
202) -> Schema {
203 let mut new_fields = vec![];
204 for field in file_schema.fields() {
205 if let Some(annotation) = metadata_from_factory(expr_adapter_factory, field.name()) {
206 new_fields.push(process_field_annotation(field, annotation, eager_shredding));
207 } else {
208 new_fields.push(field.clone());
209 }
210 }
211 Schema::new(new_fields)
212}
213
214fn process_field_annotation(
215 field: &Arc<Field>,
216 annotation: ColumnAnnotation,
217 eager_shredding: bool,
218) -> Arc<Field> {
219 let mut field_metadata = field.metadata().clone();
220 let mut updated_field = Field::clone(field.as_ref());
221 match annotation {
222 ColumnAnnotation::DatePart(unit) => {
223 field_metadata.insert(
224 DATE_MAPPING_METADATA_KEY.to_string(),
225 serialize_date_part(&unit),
226 );
227 }
228 ColumnAnnotation::VariantPaths(paths) => {
229 if eager_shredding {
230 if let Some(serialized) = serialize_variant_mappings(&paths) {
231 field_metadata.insert(VARIANT_MAPPING_METADATA_KEY.to_string(), serialized);
232 }
233 updated_field = enrich_variant_field_type(&updated_field, &paths);
234 }
235 }
236 ColumnAnnotation::SubstringSearch => {
237 field_metadata.insert(
238 STRING_FINGERPRINT_METADATA_KEY.to_string(),
239 "substring".into(),
240 );
241 }
242 }
243 Arc::new(updated_field.with_metadata(field_metadata))
244}
245
246pub(crate) fn enrich_variant_field_type(field: &Field, fields: &[VariantField]) -> Field {
247 let typed_specs: Vec<&VariantField> = fields
248 .iter()
249 .filter(|field| field.data_type.is_some())
250 .collect();
251 if typed_specs.is_empty() {
252 return Field::clone(field);
253 }
254
255 let new_type = match field.data_type() {
256 DataType::Struct(children) => {
257 let mut rewritten = Vec::with_capacity(children.len() + 1);
258 let mut replaced = false;
259 for child in children.iter() {
260 if child.name() == "typed_value" {
261 rewritten.push(build_variant_typed_value_field(
262 Some(child.as_ref()),
263 &typed_specs,
264 ));
265 replaced = true;
266 } else {
267 let mut child_field = child.as_ref().clone();
268 if child_field.name() == "value" {
269 child_field =
270 Field::new(child_field.name(), child_field.data_type().clone(), true)
271 .with_metadata(child_field.metadata().clone());
272 }
273 rewritten.push(Arc::new(child_field));
274 }
275 }
276 if !replaced {
277 rewritten.push(build_variant_typed_value_field(None, &typed_specs));
278 }
279 DataType::Struct(Fields::from(rewritten))
280 }
281 other => other.clone(),
282 };
283 Field::clone(field).with_data_type(new_type)
284}
285
286pub(crate) fn enrich_schema_for_cache(schema: &SchemaRef) -> SchemaRef {
287 let mut fields = vec![];
288 for field in schema.fields() {
289 let new_field = if let Some(mappings) = variant_mappings_from_field(field.as_ref()) {
290 Arc::new(enrich_variant_field_type(field.as_ref(), &mappings))
291 } else {
292 field.clone()
293 };
294 fields.push(new_field);
295 }
296 Arc::new(Schema::new(fields))
297}
298
299fn build_variant_typed_value_field(
300 existing: Option<&Field>,
301 specs: &[&VariantField],
302) -> Arc<Field> {
303 let mut schema = VariantSchema::new(existing);
304 for spec in specs {
305 if let Some(data_type) = spec.data_type.as_ref() {
306 schema.insert_path(&spec.path, data_type);
307 }
308 }
309
310 Arc::new(Field::new(
311 "typed_value",
312 DataType::Struct(Fields::from(schema.typed_fields())),
313 true,
314 ))
315}
316
317#[cfg(test)]
318mod tests {
319 use std::path::PathBuf;
320
321 use datafusion::{datasource::physical_plan::FileScanConfig, prelude::SessionContext};
322 use liquid_cache::{
323 cache::{AlwaysHydrate, squeeze_policies::TranscodeSqueezeEvict},
324 cache_policies::LiquidPolicy,
325 };
326
327 use crate::LiquidCacheParquet;
328 use liquid_cache_common::IoMode;
329
330 use super::*;
331
332 fn rewrite_plan_inner(plan: Arc<dyn ExecutionPlan>) {
333 let expected_schema = plan.schema();
334 let liquid_cache = Arc::new(LiquidCacheParquet::new(
335 8192,
336 1000000,
337 PathBuf::from("test"),
338 Box::new(LiquidPolicy::new()),
339 Box::new(TranscodeSqueezeEvict),
340 Box::new(AlwaysHydrate::new()),
341 IoMode::Uring,
342 ));
343 let rewritten = rewrite_data_source_plan(plan, &liquid_cache, true);
344
345 rewritten
346 .apply(|node| {
347 if let Some(plan) = node.as_any().downcast_ref::<DataSourceExec>() {
348 let data_source = plan.data_source();
349 let any_source = data_source.as_any();
350 let source = any_source.downcast_ref::<FileScanConfig>().unwrap();
351 let file_source = source.file_source();
352 let any_file_source = file_source.as_any();
353 let _parquet_source = any_file_source
354 .downcast_ref::<LiquidParquetSource>()
355 .unwrap();
356 let schema = source.file_schema().as_ref();
357 assert_eq!(schema, expected_schema.as_ref());
358 }
359 Ok(TreeNodeRecursion::Continue)
360 })
361 .unwrap();
362 }
363
364 #[tokio::test]
365 async fn test_plan_rewrite() {
366 let ctx = SessionContext::new();
367 ctx.register_parquet(
368 "nano_hits",
369 "../../examples/nano_hits.parquet",
370 Default::default(),
371 )
372 .await
373 .unwrap();
374 let df = ctx
375 .sql("SELECT * FROM nano_hits WHERE \"URL\" like 'https://%' limit 10")
376 .await
377 .unwrap();
378 let plan = df.create_physical_plan().await.unwrap();
379 rewrite_plan_inner(plan.clone());
380 }
381}