liquid_cache_datafusion/reader/plantime/
source.rs1use super::opener::LiquidParquetOpener;
2use crate::cache::LiquidCacheParquetRef;
3use ahash::{HashMap, HashMapExt};
4use arrow_schema::Schema;
5use bytes::Bytes;
6use datafusion::{
7 config::TableParquetOptions,
8 datasource::{
9 listing::PartitionedFile,
10 physical_plan::{
11 FileScanConfig, FileSource, ParquetFileMetrics, ParquetFileReaderFactory,
12 ParquetSource, parquet::PagePruningAccessPlanFilter,
13 },
14 table_schema::TableSchema,
15 },
16 error::Result,
17 physical_expr::projection::ProjectionExprs,
18 physical_expr_adapter::DefaultPhysicalExprAdapterFactory,
19 physical_optimizer::pruning::PruningPredicate,
20 physical_plan::{
21 PhysicalExpr,
22 metrics::{ExecutionPlanMetricsSet, MetricBuilder},
23 },
24};
25use futures::{FutureExt, future::BoxFuture};
26use object_store::{ObjectStore, path::Path};
27use parquet::{
28 arrow::{
29 arrow_reader::ArrowReaderOptions,
30 async_reader::{AsyncFileReader, ParquetObjectReader},
31 },
32 file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader},
33};
34use std::{
35 any::Any,
36 ops::Range,
37 sync::{Arc, LazyLock},
38};
39use tokio::sync::RwLock;
40
41static META_CACHE: LazyLock<MetadataCache> = LazyLock::new(MetadataCache::new);
42
43#[derive(Debug)]
44pub(crate) struct CachedMetaReaderFactory {
45 store: Arc<dyn ObjectStore>,
46}
47
48impl CachedMetaReaderFactory {
49 pub(crate) fn new(store: Arc<dyn ObjectStore>) -> Self {
50 Self { store }
51 }
52
53 pub(crate) fn create_liquid_reader(
54 &self,
55 partition_index: usize,
56 partitioned_file: PartitionedFile,
57 metadata_size_hint: Option<usize>,
58 metrics: &ExecutionPlanMetricsSet,
59 ) -> ParquetMetadataCacheReader {
60 let path = partitioned_file.object_meta.location.clone();
61 let store = Arc::clone(&self.store);
62 let mut inner = ParquetObjectReader::new(store, path.clone());
63
64 if let Some(hint) = metadata_size_hint {
65 inner = inner.with_footer_size_hint(hint);
66 }
67
68 ParquetMetadataCacheReader {
69 file_metrics: ParquetFileMetrics::new(partition_index, path.as_ref(), metrics),
70 inner,
71 path,
72 }
73 }
74}
75
76impl ParquetFileReaderFactory for CachedMetaReaderFactory {
77 fn create_reader(
78 &self,
79 partition_index: usize,
80 partitioned_file: PartitionedFile,
81 metadata_size_hint: Option<usize>,
82 metrics: &ExecutionPlanMetricsSet,
83 ) -> Result<Box<dyn AsyncFileReader + Send>> {
84 let reader = self.create_liquid_reader(
85 partition_index,
86 partitioned_file,
87 metadata_size_hint,
88 metrics,
89 );
90 Ok(Box::new(reader))
91 }
92}
93
94struct MetadataCache {
95 val: RwLock<HashMap<Path, Arc<ParquetMetaData>>>,
96}
97
98impl MetadataCache {
99 fn new() -> Self {
100 Self {
101 val: RwLock::new(HashMap::new()),
102 }
103 }
104}
105
106#[derive(Clone)]
107pub struct ParquetMetadataCacheReader {
108 file_metrics: ParquetFileMetrics,
109 inner: ParquetObjectReader,
110 path: Path,
111}
112
113impl AsyncFileReader for ParquetMetadataCacheReader {
114 fn get_byte_ranges(
115 &mut self,
116 ranges: Vec<Range<u64>>,
117 ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
118 let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
119 self.file_metrics.bytes_scanned.add(total as usize);
120 self.inner.get_byte_ranges(ranges)
121 }
122
123 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
124 self.file_metrics
125 .bytes_scanned
126 .add((range.end - range.start) as usize);
127 self.inner.get_bytes(range)
128 }
129
130 fn get_metadata(
131 &mut self,
132 options: Option<&ArrowReaderOptions>,
133 ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
134 let path = self.path.clone();
135 let options = options.cloned();
136 async move {
137 {
139 let cache = META_CACHE.val.read().await;
140 if let Some(meta) = cache.get(&path) {
141 return Ok(meta.clone());
142 }
143 }
144
145 let mut cache = META_CACHE.val.write().await;
147 match cache.entry(path.clone()) {
148 std::collections::hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
149 std::collections::hash_map::Entry::Vacant(entry) => {
150 let meta = self.inner.get_metadata(options.as_ref()).await?;
151 let meta = Arc::try_unwrap(meta).unwrap_or_else(|e| e.as_ref().clone());
152 let mut reader = ParquetMetaDataReader::new_with_metadata(meta.clone())
153 .with_page_index_policy(PageIndexPolicy::Optional);
154 reader.load_page_index(&mut self.inner).await?;
155 let meta = Arc::new(reader.finish()?);
156 entry.insert(meta.clone());
157 Ok(meta)
158 }
159 }
160 }
161 .boxed()
162 }
163}
164
165#[derive(Clone)]
167pub struct LiquidParquetSource {
168 metrics: ExecutionPlanMetricsSet,
169 predicate: Option<Arc<dyn PhysicalExpr>>,
170 pruning_predicate: Option<Arc<PruningPredicate>>,
171 page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
172 table_parquet_options: TableParquetOptions,
173 liquid_cache: LiquidCacheParquetRef,
174 batch_size: Option<usize>,
175 projection: ProjectionExprs,
176 table_schema: TableSchema,
177 span: Option<Arc<fastrace::Span>>,
178}
179
180impl LiquidParquetSource {
181 fn reorder_filters(&self) -> bool {
182 self.table_parquet_options.global.reorder_filters
183 }
184
185 pub fn with_span(&self, span: fastrace::Span) -> Self {
187 Self {
188 span: Some(Arc::new(span)),
189 ..self.clone()
190 }
191 }
192
193 pub fn with_table_schema(&self, table_schema: TableSchema) -> Self {
195 Self {
196 table_schema,
197 ..self.clone()
198 }
199 }
200
201 pub fn with_predicate(
203 mut self,
204 file_schema: Arc<Schema>,
205 predicate: Arc<dyn PhysicalExpr>,
206 ) -> Self {
207 let metrics = ExecutionPlanMetricsSet::new();
208 let predicate_creation_errors =
209 MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");
210
211 self.metrics = metrics;
212 self.predicate = Some(Arc::clone(&predicate));
213
214 match PruningPredicate::try_new(Arc::clone(&predicate), Arc::clone(&file_schema)) {
215 Ok(pruning_predicate) => {
216 if !pruning_predicate.always_true() {
217 self.pruning_predicate = Some(Arc::new(pruning_predicate));
218 }
219 }
220 Err(e) => {
221 log::debug!("Could not create pruning predicate for: {e}");
222 predicate_creation_errors.add(1);
223 }
224 };
225
226 let page_pruning_predicate = Arc::new(PagePruningAccessPlanFilter::new(
227 &predicate,
228 Arc::clone(&file_schema),
229 ));
230 self.page_pruning_predicate = Some(page_pruning_predicate);
231
232 self
233 }
234
235 pub fn from_parquet_source(source: ParquetSource, liquid_cache: LiquidCacheParquetRef) -> Self {
237 let predicate = source.filter();
238
239 let table_schema = source.table_schema().clone();
240 let file_schema = table_schema.file_schema().clone();
241 let projection = source.projection().cloned().unwrap_or_else(|| {
242 let table_schema = table_schema.table_schema();
243 ProjectionExprs::from_indices(
244 &(0..table_schema.fields().len()).collect::<Vec<_>>(),
245 table_schema,
246 )
247 });
248 let mut v = Self {
249 table_schema,
250 table_parquet_options: source.table_parquet_options().clone(),
251 batch_size: Some(liquid_cache.batch_size()),
252 liquid_cache,
253 projection,
254 metrics: source.metrics().clone(),
255 predicate: None,
256 pruning_predicate: None,
257 page_pruning_predicate: None,
258 span: None,
259 };
260
261 if let Some(predicate) = predicate {
262 v = v.with_predicate(file_schema, predicate);
263 }
264
265 v
266 }
267
268 pub fn predicate(&self) -> Option<Arc<dyn PhysicalExpr>> {
270 self.predicate.clone()
271 }
272}
273
274impl FileSource for LiquidParquetSource {
275 fn as_any(&self) -> &dyn Any {
276 self
277 }
278
279 fn create_file_opener(
280 &self,
281 object_store: Arc<dyn ObjectStore>,
282 base_config: &FileScanConfig,
283 partition: usize,
284 ) -> Result<Arc<dyn datafusion::datasource::physical_plan::FileOpener>> {
285 let expr_adapter_factory = base_config
286 .expr_adapter_factory
287 .clone()
288 .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory) as _);
289
290 let reader_factory = Arc::new(CachedMetaReaderFactory::new(object_store));
291
292 let execution_span = self
293 .span
294 .clone()
295 .map(|span| fastrace::Span::enter_with_parent(format!("opener_{partition}"), &span));
296 let opener = LiquidParquetOpener::new(
297 partition,
298 self.projection.clone(),
299 self.batch_size
300 .expect("Batch size must be set before creating LiquidParquetOpener"),
301 base_config.limit,
302 self.predicate.clone(),
303 self.table_schema.clone(),
304 self.metrics.clone(),
305 self.liquid_cache.clone(),
306 reader_factory,
307 self.reorder_filters(),
308 expr_adapter_factory,
309 execution_span.map(Arc::new),
310 );
311
312 Ok(Arc::new(opener))
313 }
314
315 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
316 let mut conf = self.clone();
317 conf.batch_size = Some(batch_size);
318 Arc::new(conf)
319 }
320
321 fn table_schema(&self) -> &TableSchema {
322 &self.table_schema
323 }
324
325 fn try_pushdown_projection(
326 &self,
327 projection: &ProjectionExprs,
328 ) -> Result<Option<Arc<dyn FileSource>>> {
329 let mut source = self.clone();
330 source.projection = self.projection.try_merge(projection)?;
331 Ok(Some(Arc::new(source)))
332 }
333
334 fn projection(&self) -> Option<&ProjectionExprs> {
335 Some(&self.projection)
336 }
337
338 fn metrics(&self) -> &ExecutionPlanMetricsSet {
339 &self.metrics
340 }
341
342 fn file_type(&self) -> &str {
343 "liquid_parquet"
344 }
345}