datafusion_execution/cache/cache_manager.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::cache::CacheAccessor;
19use crate::cache::DefaultListFilesCache;
20use crate::cache::file_statistics_cache::{
21 DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, DefaultFileStatisticsCache,
22 DefaultFilesMetadataCache,
23};
24use crate::cache::list_files_cache::ListFilesEntry;
25use crate::cache::list_files_cache::TableScopedPath;
26use datafusion_common::TableReference;
27use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx};
28use datafusion_common::stats::Precision;
29use datafusion_common::{Result, Statistics};
30use datafusion_physical_expr_common::sort_expr::LexOrdering;
31use object_store::ObjectMeta;
32use object_store::path::Path;
33use std::any::Any;
34use std::collections::HashMap;
35use std::fmt::{Debug, Formatter};
36use std::ops::Deref;
37use std::sync::Arc;
38use std::time::Duration;
39
40pub use super::list_files_cache::{
41 DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
42};
43
44/// Cached metadata for a file, including statistics and ordering.
45///
46/// This struct embeds the [`ObjectMeta`] used for cache validation,
47/// along with the cached statistics and ordering information.
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct CachedFileMetadata {
50 /// File metadata used for cache validation (size, last_modified).
51 pub meta: ObjectMeta,
52 /// Cached statistics for the file, if available.
53 pub statistics: Arc<Statistics>,
54 /// Cached ordering for the file.
55 pub ordering: Option<LexOrdering>,
56}
57
58impl CachedFileMetadata {
59 /// Create a new cached file metadata entry.
60 pub fn new(
61 meta: ObjectMeta,
62 statistics: Arc<Statistics>,
63 ordering: Option<LexOrdering>,
64 ) -> Self {
65 Self {
66 meta,
67 statistics,
68 ordering,
69 }
70 }
71
72 /// Check if this cached entry is still valid for the given metadata.
73 ///
74 /// Returns true if the file size and last modified time match.
75 pub fn is_valid_for(&self, current_meta: &ObjectMeta) -> bool {
76 self.meta.size == current_meta.size
77 && self.meta.last_modified == current_meta.last_modified
78 }
79}
80
81/// A cache for file statistics and orderings.
82///
83/// This cache stores [`CachedFileMetadata`] which includes:
84/// - File metadata for validation (size, last_modified)
85/// - Statistics for the file
86/// - Ordering information for the file
87///
88/// If enabled via [`CacheManagerConfig::with_file_statistics_cache`] this
89/// cache avoids inferring the same file statistics repeatedly during the
90/// session lifetime.
91///
92/// The typical usage pattern is:
93/// 1. Call `get(path)` to check for cached value
94/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)`
95/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
96///
97/// See [`crate::runtime_env::RuntimeEnv`] for more details
98pub trait FileStatisticsCache:
99 CacheAccessor<TableScopedPath, CachedFileMetadata>
100{
101 /// Cache memory limit in bytes.
102 fn cache_limit(&self) -> usize;
103
104 /// Updates the cache with a new memory limit in bytes.
105 fn update_cache_limit(&self, limit: usize);
106
107 /// Retrieves the information about the entries currently cached.
108 fn list_entries(&self) -> HashMap<TableScopedPath, FileStatisticsCacheEntry>;
109
110 fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> Result<()>;
111}
112
113impl DFHeapSize for CachedFileMetadata {
114 fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
115 self.meta.size.heap_size(ctx)
116 + self.meta.last_modified.heap_size(ctx)
117 + self.meta.version.heap_size(ctx)
118 + self.meta.e_tag.heap_size(ctx)
119 + self.meta.location.as_ref().heap_size(ctx)
120 + self.statistics.heap_size(ctx)
121 //TODO add ordering once LexOrdering/PhysicalExpr implements DFHeapSize
122 }
123}
124
125/// Represents information about a cached statistics entry.
126/// This is used to expose the statistics cache contents to outside modules.
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct FileStatisticsCacheEntry {
129 pub object_meta: ObjectMeta,
130 /// Number of table rows.
131 pub num_rows: Precision<usize>,
132 /// Number of table columns.
133 pub num_columns: usize,
134 /// Total table size, in bytes.
135 pub table_size_bytes: Precision<usize>,
136 /// Size of the statistics entry, in bytes.
137 pub statistics_size_bytes: usize,
138 /// Whether ordering information is cached for this file.
139 pub has_ordering: bool,
140}
141
142/// Cached file listing.
143///
144/// TTL expiration is handled internally by the cache implementation.
145#[derive(Debug, Clone, PartialEq)]
146pub struct CachedFileList {
147 /// The cached file list.
148 pub files: Arc<Vec<ObjectMeta>>,
149}
150
151impl CachedFileList {
152 /// Create a new cached file list.
153 pub fn new(files: Vec<ObjectMeta>) -> Self {
154 Self {
155 files: Arc::new(files),
156 }
157 }
158
159 /// Filter the files by prefix.
160 fn filter_by_prefix(&self, prefix: &Option<Path>) -> Vec<ObjectMeta> {
161 match prefix {
162 Some(prefix) => self
163 .files
164 .iter()
165 .filter(|meta| meta.location.as_ref().starts_with(prefix.as_ref()))
166 .cloned()
167 .collect(),
168 None => self.files.as_ref().clone(),
169 }
170 }
171
172 /// Returns files matching the given prefix.
173 ///
174 /// When prefix is `None`, returns a clone of the `Arc` (no data copy).
175 /// When filtering is needed, returns a new `Arc` with filtered results (clones each matching [`ObjectMeta`]).
176 pub fn files_matching_prefix(&self, prefix: &Option<Path>) -> Arc<Vec<ObjectMeta>> {
177 match prefix {
178 None => Arc::clone(&self.files),
179 Some(p) => Arc::new(self.filter_by_prefix(&Some(p.clone()))),
180 }
181 }
182}
183
184impl Deref for CachedFileList {
185 type Target = Arc<Vec<ObjectMeta>>;
186 fn deref(&self) -> &Self::Target {
187 &self.files
188 }
189}
190
191impl From<Vec<ObjectMeta>> for CachedFileList {
192 fn from(files: Vec<ObjectMeta>) -> Self {
193 Self::new(files)
194 }
195}
196
197/// Cache for storing the [`ObjectMeta`]s that result from listing a path
198///
199/// Listing a path means doing an object store "list" operation or `ls`
200/// command on the local filesystem. This operation can be expensive,
201/// especially when done over remote object stores.
202///
203/// The cache key is always the table's base path, ensuring a stable cache key.
204/// The cached value is a [`CachedFileList`] containing the files and a timestamp.
205///
206/// Partition filtering is done after retrieval using [`CachedFileList::files_matching_prefix`].
207///
208/// See [`crate::runtime_env::RuntimeEnv`] for more details.
209pub trait ListFilesCache: CacheAccessor<TableScopedPath, CachedFileList> {
210 /// Returns the cache's memory limit in bytes.
211 fn cache_limit(&self) -> usize;
212
213 /// Returns the TTL (time-to-live) for cache entries, if configured.
214 fn cache_ttl(&self) -> Option<Duration>;
215
216 /// Updates the cache with a new memory limit in bytes.
217 fn update_cache_limit(&self, limit: usize);
218
219 /// Updates the cache with a new TTL (time-to-live).
220 fn update_cache_ttl(&self, ttl: Option<Duration>);
221
222 /// Retrieves the information about the entries currently cached.
223 fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry>;
224
225 /// Drop all entries for the given table reference.
226 fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> Result<()>;
227}
228
229/// Generic file-embedded metadata used with [`FileMetadataCache`].
230///
231/// For example, Parquet footers and page metadata can be represented
232/// using this trait.
233///
234/// See [`crate::runtime_env::RuntimeEnv`] for more details
235pub trait FileMetadata: Any + Send + Sync {
236 /// Returns the file metadata as [`Any`] so that it can be downcast to a specific
237 /// implementation.
238 fn as_any(&self) -> &dyn Any;
239
240 /// Returns the size of the metadata in bytes.
241 fn memory_size(&self) -> usize;
242
243 /// Returns extra information about this entry (used by [`FileMetadataCache::list_entries`]).
244 fn extra_info(&self) -> HashMap<String, String>;
245}
246
247/// Cached file metadata entry with validation information.
248#[derive(Clone)]
249pub struct CachedFileMetadataEntry {
250 /// File metadata used for cache validation (size, last_modified).
251 pub meta: ObjectMeta,
252 /// The cached file metadata.
253 pub file_metadata: Arc<dyn FileMetadata>,
254}
255
256impl CachedFileMetadataEntry {
257 /// Create a new cached file metadata entry.
258 pub fn new(meta: ObjectMeta, file_metadata: Arc<dyn FileMetadata>) -> Self {
259 Self {
260 meta,
261 file_metadata,
262 }
263 }
264
265 /// Check if this cached entry is still valid for the given metadata.
266 pub fn is_valid_for(&self, current_meta: &ObjectMeta) -> bool {
267 self.meta.size == current_meta.size
268 && self.meta.last_modified == current_meta.last_modified
269 }
270}
271
272impl Debug for CachedFileMetadataEntry {
273 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
274 f.debug_struct("CachedFileMetadataEntry")
275 .field("meta", &self.meta)
276 .field("memory_size", &self.file_metadata.memory_size())
277 .finish()
278 }
279}
280
281/// Cache for file-embedded metadata.
282///
283/// This cache stores per-file metadata in the form of [`CachedFileMetadataEntry`],
284/// which includes the [`ObjectMeta`] for validation.
285///
286/// For example, the built in [`ListingTable`] uses this cache to avoid parsing
287/// Parquet footers multiple times for the same file.
288///
289/// DataFusion provides a default implementation, [`DefaultFilesMetadataCache`],
290/// and users can also provide their own implementations to implement custom
291/// caching strategies.
292///
293/// The typical usage pattern is:
294/// 1. Call `get(path)` to check for cached value
295/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)`
296/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
297///
298/// See [`crate::runtime_env::RuntimeEnv`] for more details.
299///
300/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
301pub trait FileMetadataCache: CacheAccessor<Path, CachedFileMetadataEntry> {
302 /// Returns the cache's memory limit in bytes.
303 fn cache_limit(&self) -> usize;
304
305 /// Updates the cache with a new memory limit in bytes.
306 fn update_cache_limit(&self, limit: usize);
307
308 /// Retrieves the information about the entries currently cached.
309 fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry>;
310}
311
312#[derive(Debug, Clone, PartialEq, Eq)]
313/// Represents information about a cached metadata entry.
314/// This is used to expose the metadata cache contents to outside modules.
315pub struct FileMetadataCacheEntry {
316 pub object_meta: ObjectMeta,
317 /// Size of the cached metadata, in bytes.
318 pub size_bytes: usize,
319 /// Number of times this entry was retrieved.
320 pub hits: usize,
321 /// Additional object-specific information.
322 pub extra: HashMap<String, String>,
323}
324
325impl Debug for dyn FileStatisticsCache {
326 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
327 write!(f, "Cache name: {} with length: {}", self.name(), self.len())
328 }
329}
330
331impl Debug for dyn ListFilesCache {
332 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
333 write!(f, "Cache name: {} with length: {}", self.name(), self.len())
334 }
335}
336
337impl Debug for dyn FileMetadataCache {
338 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
339 write!(f, "Cache name: {} with length: {}", self.name(), self.len())
340 }
341}
342
343/// Manages various caches used in DataFusion.
344///
345/// Following DataFusion design principles, DataFusion provides default cache
346/// implementations, while also allowing users to provide their own custom cache
347/// implementations by implementing the relevant traits.
348///
349/// See [`CacheManagerConfig`] for configuration options.
350#[derive(Debug)]
351pub struct CacheManager {
352 file_statistic_cache: Option<Arc<dyn FileStatisticsCache>>,
353 list_files_cache: Option<Arc<dyn ListFilesCache>>,
354 file_metadata_cache: Arc<dyn FileMetadataCache>,
355}
356
357impl CacheManager {
358 pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
359 let file_statistic_cache = match &config.file_statistics_cache {
360 Some(fsc) if config.file_statistics_cache_limit > 0 => {
361 fsc.update_cache_limit(config.file_statistics_cache_limit);
362 Some(Arc::clone(fsc))
363 }
364 None if config.file_statistics_cache_limit > 0 => {
365 let fsc: Arc<dyn FileStatisticsCache> = Arc::new(
366 DefaultFileStatisticsCache::new(config.file_statistics_cache_limit),
367 );
368 Some(fsc)
369 }
370 _ => None,
371 };
372
373 let list_files_cache = match &config.list_files_cache {
374 Some(lfc) if config.list_files_cache_limit > 0 => {
375 // the cache memory limit or ttl might have changed, ensure they are updated
376 lfc.update_cache_limit(config.list_files_cache_limit);
377 // Only update TTL if explicitly set in config, otherwise preserve the cache's existing TTL
378 if let Some(ttl) = config.list_files_cache_ttl {
379 lfc.update_cache_ttl(Some(ttl));
380 }
381 Some(Arc::clone(lfc))
382 }
383 None if config.list_files_cache_limit > 0 => {
384 let lfc: Arc<dyn ListFilesCache> = Arc::new(DefaultListFilesCache::new(
385 config.list_files_cache_limit,
386 config.list_files_cache_ttl,
387 ));
388 Some(lfc)
389 }
390 _ => None,
391 };
392
393 let file_metadata_cache = config
394 .file_metadata_cache
395 .as_ref()
396 .map(Arc::clone)
397 .unwrap_or_else(|| {
398 Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
399 });
400
401 // the cache memory limit might have changed, ensure the limit is updated
402 file_metadata_cache.update_cache_limit(config.metadata_cache_limit);
403
404 Ok(Arc::new(CacheManager {
405 file_statistic_cache,
406 list_files_cache,
407 file_metadata_cache,
408 }))
409 }
410
411 /// Get the file statistics cache.
412 pub fn get_file_statistic_cache(&self) -> Option<Arc<dyn FileStatisticsCache>> {
413 self.file_statistic_cache.clone()
414 }
415
416 /// Get the memory limit of the file statistics cache.
417 pub fn get_file_statistic_cache_limit(&self) -> usize {
418 self.file_statistic_cache
419 .as_ref()
420 .map_or(0, |c| c.cache_limit())
421 }
422
423 /// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path.
424 pub fn get_list_files_cache(&self) -> Option<Arc<dyn ListFilesCache>> {
425 self.list_files_cache.clone()
426 }
427
428 /// Get the memory limit of the list files cache.
429 pub fn get_list_files_cache_limit(&self) -> usize {
430 self.list_files_cache
431 .as_ref()
432 .map_or(0, |c| c.cache_limit())
433 }
434
435 /// Get the TTL (time-to-live) of the list files cache.
436 pub fn get_list_files_cache_ttl(&self) -> Option<Duration> {
437 self.list_files_cache.as_ref().and_then(|c| c.cache_ttl())
438 }
439
440 /// Get the file embedded metadata cache.
441 pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
442 Arc::clone(&self.file_metadata_cache)
443 }
444
445 /// Get the limit of the file embedded metadata cache.
446 pub fn get_metadata_cache_limit(&self) -> usize {
447 self.file_metadata_cache.cache_limit()
448 }
449}
450
451pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
452
453#[derive(Clone)]
454pub struct CacheManagerConfig {
455 /// Enable caching of file statistics when listing files.
456 /// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session.
457 /// Default is enabled. Currently only Parquet files are supported.
458 pub file_statistics_cache: Option<Arc<dyn FileStatisticsCache>>,
459 /// Limit of the file statistics cache, in bytes. Default: 20MiB.
460 pub file_statistics_cache_limit: usize,
461 /// Enable caching of file metadata when listing files.
462 /// Enabling the cache avoids repeat list and object metadata fetch operations, which may be
463 /// expensive in certain situations (e.g. remote object storage), for objects under paths that
464 /// are cached.
465 /// Note that if this option is enabled, DataFusion will not see any updates to the underlying
466 /// storage for at least `list_files_cache_ttl` duration.
467 /// Default is enabled.
468 pub list_files_cache: Option<Arc<dyn ListFilesCache>>,
469 /// Limit of the `list_files_cache`, in bytes. Default: 1MiB.
470 pub list_files_cache_limit: usize,
471 /// The duration the list files cache will consider an entry valid after insertion. Note that
472 /// changes to the underlying storage system, such as adding or removing data, will not be
473 /// visible until an entry expires. Default: None (infinite).
474 pub list_files_cache_ttl: Option<Duration>,
475 /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
476 /// data file (e.g., Parquet footer and page metadata).
477 /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
478 pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
479 /// Limit of the file-embedded metadata cache, in bytes.
480 pub metadata_cache_limit: usize,
481}
482
483impl Default for CacheManagerConfig {
484 fn default() -> Self {
485 Self {
486 file_statistics_cache: Default::default(),
487 file_statistics_cache_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT,
488 list_files_cache: Default::default(),
489 list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
490 list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL,
491 file_metadata_cache: Default::default(),
492 metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
493 }
494 }
495}
496
497impl CacheManagerConfig {
498 /// Set the cache for file statistics.
499 pub fn with_file_statistics_cache(
500 mut self,
501 cache: Option<Arc<dyn FileStatisticsCache>>,
502 ) -> Self {
503 self.file_statistics_cache = cache;
504 self
505 }
506
507 /// Specifies the memory limit for the file statistics cache, in bytes.
508 pub fn with_file_statistics_cache_limit(mut self, limit: usize) -> Self {
509 self.file_statistics_cache_limit = limit;
510 self
511 }
512
513 /// Set the cache for listing files.
514 ///
515 /// Default is `None` (disabled).
516 pub fn with_list_files_cache(
517 mut self,
518 cache: Option<Arc<dyn ListFilesCache>>,
519 ) -> Self {
520 self.list_files_cache = cache;
521 self
522 }
523
524 /// Sets the limit of the list files cache, in bytes.
525 ///
526 /// Default: 1MiB (1,048,576 bytes).
527 pub fn with_list_files_cache_limit(mut self, limit: usize) -> Self {
528 self.list_files_cache_limit = limit;
529 self
530 }
531
532 /// Sets the TTL (time-to-live) for entries in the list files cache.
533 ///
534 /// Default: None (infinite).
535 pub fn with_list_files_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
536 self.list_files_cache_ttl = ttl;
537 self
538 }
539
540 /// Sets the cache for file-embedded metadata.
541 ///
542 /// Default is a [`DefaultFilesMetadataCache`].
543 pub fn with_file_metadata_cache(
544 mut self,
545 cache: Option<Arc<dyn FileMetadataCache>>,
546 ) -> Self {
547 self.file_metadata_cache = cache;
548 self
549 }
550
551 /// Sets the limit of the file-embedded metadata cache, in bytes.
552 pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
553 self.metadata_cache_limit = limit;
554 self
555 }
556}
557
558#[cfg(test)]
559mod tests {
560 use super::*;
561
562 /// Test to verify that TTL is preserved when not explicitly set in config.
563 /// This fixes issue #19396 where TTL was being unset from DefaultListFilesCache
564 /// when CacheManagerConfig::list_files_cache_ttl was not set explicitly.
565 #[test]
566 fn test_ttl_preserved_when_not_set_in_config() {
567 // Create a cache with TTL = 1 second
568 let list_file_cache =
569 DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
570
571 // Verify the cache has TTL set initially
572 assert_eq!(
573 list_file_cache.cache_ttl(),
574 Some(Duration::from_secs(1)),
575 "Cache should have TTL = 1 second initially"
576 );
577
578 // Put cache in config WITHOUT setting list_files_cache_ttl
579 let config = CacheManagerConfig::default()
580 .with_list_files_cache(Some(Arc::new(list_file_cache)));
581
582 // Create CacheManager from config
583 let cache_manager = CacheManager::try_new(&config).unwrap();
584
585 // Verify TTL is preserved (not unset)
586 let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
587
588 assert!(
589 cache_ttl.is_some(),
590 "TTL should be preserved when not set in config. Expected Some(Duration::from_secs(1)), got {cache_ttl:?}"
591 );
592
593 // Verify it's the correct TTL value
594 assert_eq!(
595 cache_ttl,
596 Some(Duration::from_secs(1)),
597 "TTL should be exactly 1 second"
598 );
599 }
600
601 /// Test to verify that TTL can still be overridden when explicitly set in config.
602 #[test]
603 fn test_ttl_overridden_when_set_in_config() {
604 // Create a cache with TTL = 1 second
605 let list_file_cache =
606 DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
607
608 // Put cache in config WITH a different TTL set
609 let config = CacheManagerConfig::default()
610 .with_list_files_cache(Some(Arc::new(list_file_cache)))
611 .with_list_files_cache_ttl(Some(Duration::from_secs(60)));
612
613 // Create CacheManager from config
614 let cache_manager = CacheManager::try_new(&config).unwrap();
615
616 // Verify TTL is overridden to the config value
617 let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
618
619 assert_eq!(
620 cache_ttl,
621 Some(Duration::from_secs(60)),
622 "TTL should be overridden to 60 seconds when set in config"
623 );
624 }
625}