Skip to main content

liquid_cache_datafusion/reader/plantime/
source.rs

1use super::opener::LiquidParquetOpener;
2use crate::cache::LiquidCacheParquetRef;
3use ahash::{HashMap, HashMapExt};
4use arrow_schema::Schema;
5use bytes::Bytes;
6use datafusion::{
7    config::TableParquetOptions,
8    datasource::{
9        listing::PartitionedFile,
10        physical_plan::{
11            FileScanConfig, FileSource, ParquetFileMetrics, ParquetFileReaderFactory,
12            ParquetSource, parquet::PagePruningAccessPlanFilter,
13        },
14        table_schema::TableSchema,
15    },
16    error::Result,
17    physical_expr::projection::ProjectionExprs,
18    physical_expr_adapter::DefaultPhysicalExprAdapterFactory,
19    physical_optimizer::pruning::PruningPredicate,
20    physical_plan::{
21        PhysicalExpr,
22        metrics::{ExecutionPlanMetricsSet, MetricBuilder},
23    },
24};
25use futures::{FutureExt, future::BoxFuture};
26use object_store::{ObjectStore, path::Path};
27use parquet::{
28    arrow::{
29        arrow_reader::ArrowReaderOptions,
30        async_reader::{AsyncFileReader, ParquetObjectReader},
31    },
32    file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader},
33};
34use std::{
35    any::Any,
36    ops::Range,
37    sync::{Arc, LazyLock},
38};
39use tokio::sync::RwLock;
40
41static META_CACHE: LazyLock<MetadataCache> = LazyLock::new(MetadataCache::new);
42
43#[derive(Debug)]
44pub(crate) struct CachedMetaReaderFactory {
45    store: Arc<dyn ObjectStore>,
46}
47
48impl CachedMetaReaderFactory {
49    pub(crate) fn new(store: Arc<dyn ObjectStore>) -> Self {
50        Self { store }
51    }
52
53    pub(crate) fn create_liquid_reader(
54        &self,
55        partition_index: usize,
56        partitioned_file: PartitionedFile,
57        metadata_size_hint: Option<usize>,
58        metrics: &ExecutionPlanMetricsSet,
59    ) -> ParquetMetadataCacheReader {
60        let path = partitioned_file.object_meta.location.clone();
61        let store = Arc::clone(&self.store);
62        let mut inner = ParquetObjectReader::new(store, path.clone());
63
64        if let Some(hint) = metadata_size_hint {
65            inner = inner.with_footer_size_hint(hint);
66        }
67
68        ParquetMetadataCacheReader {
69            file_metrics: ParquetFileMetrics::new(partition_index, path.as_ref(), metrics),
70            inner,
71            path,
72        }
73    }
74}
75
76impl ParquetFileReaderFactory for CachedMetaReaderFactory {
77    fn create_reader(
78        &self,
79        partition_index: usize,
80        partitioned_file: PartitionedFile,
81        metadata_size_hint: Option<usize>,
82        metrics: &ExecutionPlanMetricsSet,
83    ) -> Result<Box<dyn AsyncFileReader + Send>> {
84        let reader = self.create_liquid_reader(
85            partition_index,
86            partitioned_file,
87            metadata_size_hint,
88            metrics,
89        );
90        Ok(Box::new(reader))
91    }
92}
93
94struct MetadataCache {
95    val: RwLock<HashMap<Path, Arc<ParquetMetaData>>>,
96}
97
98impl MetadataCache {
99    fn new() -> Self {
100        Self {
101            val: RwLock::new(HashMap::new()),
102        }
103    }
104}
105
106#[derive(Clone)]
107pub struct ParquetMetadataCacheReader {
108    file_metrics: ParquetFileMetrics,
109    inner: ParquetObjectReader,
110    path: Path,
111}
112
113impl AsyncFileReader for ParquetMetadataCacheReader {
114    fn get_byte_ranges(
115        &mut self,
116        ranges: Vec<Range<u64>>,
117    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
118        let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
119        self.file_metrics.bytes_scanned.add(total as usize);
120        self.inner.get_byte_ranges(ranges)
121    }
122
123    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
124        self.file_metrics
125            .bytes_scanned
126            .add((range.end - range.start) as usize);
127        self.inner.get_bytes(range)
128    }
129
130    fn get_metadata(
131        &mut self,
132        options: Option<&ArrowReaderOptions>,
133    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
134        let path = self.path.clone();
135        let options = options.cloned();
136        async move {
137            // First check with read lock
138            {
139                let cache = META_CACHE.val.read().await;
140                if let Some(meta) = cache.get(&path) {
141                    return Ok(meta.clone());
142                }
143            }
144
145            // Upgrade to write lock and double-check
146            let mut cache = META_CACHE.val.write().await;
147            match cache.entry(path.clone()) {
148                std::collections::hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
149                std::collections::hash_map::Entry::Vacant(entry) => {
150                    let meta = self.inner.get_metadata(options.as_ref()).await?;
151                    let meta = Arc::try_unwrap(meta).unwrap_or_else(|e| e.as_ref().clone());
152                    let mut reader = ParquetMetaDataReader::new_with_metadata(meta.clone())
153                        .with_page_index_policy(PageIndexPolicy::Optional);
154                    reader.load_page_index(&mut self.inner).await?;
155                    let meta = Arc::new(reader.finish()?);
156                    entry.insert(meta.clone());
157                    Ok(meta)
158                }
159            }
160        }
161        .boxed()
162    }
163}
164
165/// The data source for LiquidCache
166#[derive(Clone)]
167pub struct LiquidParquetSource {
168    metrics: ExecutionPlanMetricsSet,
169    predicate: Option<Arc<dyn PhysicalExpr>>,
170    pruning_predicate: Option<Arc<PruningPredicate>>,
171    page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
172    table_parquet_options: TableParquetOptions,
173    liquid_cache: LiquidCacheParquetRef,
174    batch_size: Option<usize>,
175    projection: ProjectionExprs,
176    table_schema: TableSchema,
177    span: Option<Arc<fastrace::Span>>,
178}
179
180impl LiquidParquetSource {
181    fn reorder_filters(&self) -> bool {
182        self.table_parquet_options.global.reorder_filters
183    }
184
185    /// Set the span for the LiquidParquetSource
186    pub fn with_span(&self, span: fastrace::Span) -> Self {
187        Self {
188            span: Some(Arc::new(span)),
189            ..self.clone()
190        }
191    }
192
193    /// Set the table schema for the LiquidParquetSource
194    pub fn with_table_schema(&self, table_schema: TableSchema) -> Self {
195        Self {
196            table_schema,
197            ..self.clone()
198        }
199    }
200
201    /// Set predicate information, also sets pruning_predicate and page_pruning_predicate attributes
202    pub fn with_predicate(
203        mut self,
204        file_schema: Arc<Schema>,
205        predicate: Arc<dyn PhysicalExpr>,
206    ) -> Self {
207        let metrics = ExecutionPlanMetricsSet::new();
208        let predicate_creation_errors =
209            MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");
210
211        self.metrics = metrics;
212        self.predicate = Some(Arc::clone(&predicate));
213
214        match PruningPredicate::try_new(Arc::clone(&predicate), Arc::clone(&file_schema)) {
215            Ok(pruning_predicate) => {
216                if !pruning_predicate.always_true() {
217                    self.pruning_predicate = Some(Arc::new(pruning_predicate));
218                }
219            }
220            Err(e) => {
221                log::debug!("Could not create pruning predicate for: {e}");
222                predicate_creation_errors.add(1);
223            }
224        };
225
226        let page_pruning_predicate = Arc::new(PagePruningAccessPlanFilter::new(
227            &predicate,
228            Arc::clone(&file_schema),
229        ));
230        self.page_pruning_predicate = Some(page_pruning_predicate);
231
232        self
233    }
234
235    /// Create a new LiquidParquetSource from a ParquetSource
236    pub fn from_parquet_source(source: ParquetSource, liquid_cache: LiquidCacheParquetRef) -> Self {
237        let predicate = source.filter();
238
239        let table_schema = source.table_schema().clone();
240        let file_schema = table_schema.file_schema().clone();
241        let projection = source.projection().cloned().unwrap_or_else(|| {
242            let table_schema = table_schema.table_schema();
243            ProjectionExprs::from_indices(
244                &(0..table_schema.fields().len()).collect::<Vec<_>>(),
245                table_schema,
246            )
247        });
248        let mut v = Self {
249            table_schema,
250            table_parquet_options: source.table_parquet_options().clone(),
251            batch_size: Some(liquid_cache.batch_size()),
252            liquid_cache,
253            projection,
254            metrics: source.metrics().clone(),
255            predicate: None,
256            pruning_predicate: None,
257            page_pruning_predicate: None,
258            span: None,
259        };
260
261        if let Some(predicate) = predicate {
262            v = v.with_predicate(file_schema, predicate);
263        }
264
265        v
266    }
267
268    /// Get the predicate for the LiquidParquetSource
269    pub fn predicate(&self) -> Option<Arc<dyn PhysicalExpr>> {
270        self.predicate.clone()
271    }
272}
273
274impl FileSource for LiquidParquetSource {
275    fn as_any(&self) -> &dyn Any {
276        self
277    }
278
279    fn create_file_opener(
280        &self,
281        object_store: Arc<dyn ObjectStore>,
282        base_config: &FileScanConfig,
283        partition: usize,
284    ) -> Result<Arc<dyn datafusion::datasource::physical_plan::FileOpener>> {
285        let expr_adapter_factory = base_config
286            .expr_adapter_factory
287            .clone()
288            .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory) as _);
289
290        let reader_factory = Arc::new(CachedMetaReaderFactory::new(object_store));
291
292        let execution_span = self
293            .span
294            .clone()
295            .map(|span| fastrace::Span::enter_with_parent(format!("opener_{partition}"), &span));
296        let opener = LiquidParquetOpener::new(
297            partition,
298            self.projection.clone(),
299            self.batch_size
300                .expect("Batch size must be set before creating LiquidParquetOpener"),
301            base_config.limit,
302            self.predicate.clone(),
303            self.table_schema.clone(),
304            self.metrics.clone(),
305            self.liquid_cache.clone(),
306            reader_factory,
307            self.reorder_filters(),
308            expr_adapter_factory,
309            execution_span.map(Arc::new),
310        );
311
312        Ok(Arc::new(opener))
313    }
314
315    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
316        let mut conf = self.clone();
317        conf.batch_size = Some(batch_size);
318        Arc::new(conf)
319    }
320
321    fn table_schema(&self) -> &TableSchema {
322        &self.table_schema
323    }
324
325    fn try_pushdown_projection(
326        &self,
327        projection: &ProjectionExprs,
328    ) -> Result<Option<Arc<dyn FileSource>>> {
329        let mut source = self.clone();
330        source.projection = self.projection.try_merge(projection)?;
331        Ok(Some(Arc::new(source)))
332    }
333
334    fn projection(&self) -> Option<&ProjectionExprs> {
335        Some(&self.projection)
336    }
337
338    fn metrics(&self) -> &ExecutionPlanMetricsSet {
339        &self.metrics
340    }
341
342    fn file_type(&self) -> &str {
343        "liquid_parquet"
344    }
345}