Skip to main content

datafusion_execution/cache/
cache_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::cache::CacheAccessor;
19use crate::cache::DefaultListFilesCache;
20use crate::cache::file_statistics_cache::{
21    DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, DefaultFileStatisticsCache,
22    DefaultFilesMetadataCache,
23};
24use crate::cache::list_files_cache::ListFilesEntry;
25use crate::cache::list_files_cache::TableScopedPath;
26use datafusion_common::TableReference;
27use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx};
28use datafusion_common::stats::Precision;
29use datafusion_common::{Result, Statistics};
30use datafusion_physical_expr_common::sort_expr::LexOrdering;
31use object_store::ObjectMeta;
32use object_store::path::Path;
33use std::any::Any;
34use std::collections::HashMap;
35use std::fmt::{Debug, Formatter};
36use std::ops::Deref;
37use std::sync::Arc;
38use std::time::Duration;
39
40pub use super::list_files_cache::{
41    DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
42};
43
44/// Cached metadata for a file, including statistics and ordering.
45///
46/// This struct embeds the [`ObjectMeta`] used for cache validation,
47/// along with the cached statistics and ordering information.
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct CachedFileMetadata {
50    /// File metadata used for cache validation (size, last_modified).
51    pub meta: ObjectMeta,
52    /// Cached statistics for the file, if available.
53    pub statistics: Arc<Statistics>,
54    /// Cached ordering for the file.
55    pub ordering: Option<LexOrdering>,
56}
57
58impl CachedFileMetadata {
59    /// Create a new cached file metadata entry.
60    pub fn new(
61        meta: ObjectMeta,
62        statistics: Arc<Statistics>,
63        ordering: Option<LexOrdering>,
64    ) -> Self {
65        Self {
66            meta,
67            statistics,
68            ordering,
69        }
70    }
71
72    /// Check if this cached entry is still valid for the given metadata.
73    ///
74    /// Returns true if the file size and last modified time match.
75    pub fn is_valid_for(&self, current_meta: &ObjectMeta) -> bool {
76        self.meta.size == current_meta.size
77            && self.meta.last_modified == current_meta.last_modified
78    }
79}
80
81/// A cache for file statistics and orderings.
82///
83/// This cache stores [`CachedFileMetadata`] which includes:
84/// - File metadata for validation (size, last_modified)
85/// - Statistics for the file
86/// - Ordering information for the file
87///
88/// If enabled via [`CacheManagerConfig::with_file_statistics_cache`] this
89/// cache avoids inferring the same file statistics repeatedly during the
90/// session lifetime.
91///
92/// The typical usage pattern is:
93/// 1. Call `get(path)` to check for cached value
94/// 2. If `Some(cached)`, validate with `cached.is_valid_for(&current_meta)`
95/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
96///
97/// See [`crate::runtime_env::RuntimeEnv`] for more details
98pub trait FileStatisticsCache:
99    CacheAccessor<TableScopedPath, CachedFileMetadata>
100{
101    /// Cache memory limit in bytes.
102    fn cache_limit(&self) -> usize;
103
104    /// Updates the cache with a new memory limit in bytes.
105    fn update_cache_limit(&self, limit: usize);
106
107    /// Retrieves the information about the entries currently cached.
108    fn list_entries(&self) -> HashMap<TableScopedPath, FileStatisticsCacheEntry>;
109
110    fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> Result<()>;
111}
112
113impl DFHeapSize for CachedFileMetadata {
114    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
115        self.meta.size.heap_size(ctx)
116            + self.meta.last_modified.heap_size(ctx)
117            + self.meta.version.heap_size(ctx)
118            + self.meta.e_tag.heap_size(ctx)
119            + self.meta.location.as_ref().heap_size(ctx)
120            + self.statistics.heap_size(ctx)
121        //TODO add ordering once LexOrdering/PhysicalExpr implements DFHeapSize
122    }
123}
124
125/// Represents information about a cached statistics entry.
126/// This is used to expose the statistics cache contents to outside modules.
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct FileStatisticsCacheEntry {
129    pub object_meta: ObjectMeta,
130    /// Number of table rows.
131    pub num_rows: Precision<usize>,
132    /// Number of table columns.
133    pub num_columns: usize,
134    /// Total table size, in bytes.
135    pub table_size_bytes: Precision<usize>,
136    /// Size of the statistics entry, in bytes.
137    pub statistics_size_bytes: usize,
138    /// Whether ordering information is cached for this file.
139    pub has_ordering: bool,
140}
141
142/// Cached file listing.
143///
144/// TTL expiration is handled internally by the cache implementation.
145#[derive(Debug, Clone, PartialEq)]
146pub struct CachedFileList {
147    /// The cached file list.
148    pub files: Arc<Vec<ObjectMeta>>,
149}
150
151impl CachedFileList {
152    /// Create a new cached file list.
153    pub fn new(files: Vec<ObjectMeta>) -> Self {
154        Self {
155            files: Arc::new(files),
156        }
157    }
158
159    /// Filter the files by prefix.
160    fn filter_by_prefix(&self, prefix: &Option<Path>) -> Vec<ObjectMeta> {
161        match prefix {
162            Some(prefix) => self
163                .files
164                .iter()
165                .filter(|meta| meta.location.as_ref().starts_with(prefix.as_ref()))
166                .cloned()
167                .collect(),
168            None => self.files.as_ref().clone(),
169        }
170    }
171
172    /// Returns files matching the given prefix.
173    ///
174    /// When prefix is `None`, returns a clone of the `Arc` (no data copy).
175    /// When filtering is needed, returns a new `Arc` with filtered results (clones each matching [`ObjectMeta`]).
176    pub fn files_matching_prefix(&self, prefix: &Option<Path>) -> Arc<Vec<ObjectMeta>> {
177        match prefix {
178            None => Arc::clone(&self.files),
179            Some(p) => Arc::new(self.filter_by_prefix(&Some(p.clone()))),
180        }
181    }
182}
183
184impl Deref for CachedFileList {
185    type Target = Arc<Vec<ObjectMeta>>;
186    fn deref(&self) -> &Self::Target {
187        &self.files
188    }
189}
190
191impl From<Vec<ObjectMeta>> for CachedFileList {
192    fn from(files: Vec<ObjectMeta>) -> Self {
193        Self::new(files)
194    }
195}
196
197/// Cache for storing the [`ObjectMeta`]s that result from listing a path
198///
199/// Listing a path means doing an object store "list" operation or `ls`
200/// command on the local filesystem. This operation can be expensive,
201/// especially when done over remote object stores.
202///
203/// The cache key is always the table's base path, ensuring a stable cache key.
204/// The cached value is a [`CachedFileList`] containing the files and a timestamp.
205///
206/// Partition filtering is done after retrieval using [`CachedFileList::files_matching_prefix`].
207///
208/// See [`crate::runtime_env::RuntimeEnv`] for more details.
209pub trait ListFilesCache: CacheAccessor<TableScopedPath, CachedFileList> {
210    /// Returns the cache's memory limit in bytes.
211    fn cache_limit(&self) -> usize;
212
213    /// Returns the TTL (time-to-live) for cache entries, if configured.
214    fn cache_ttl(&self) -> Option<Duration>;
215
216    /// Updates the cache with a new memory limit in bytes.
217    fn update_cache_limit(&self, limit: usize);
218
219    /// Updates the cache with a new TTL (time-to-live).
220    fn update_cache_ttl(&self, ttl: Option<Duration>);
221
222    /// Retrieves the information about the entries currently cached.
223    fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry>;
224
225    /// Drop all entries for the given table reference.
226    fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> Result<()>;
227}
228
229/// Generic file-embedded metadata used with [`FileMetadataCache`].
230///
231/// For example, Parquet footers and page metadata can be represented
232/// using this trait.
233///
234/// See [`crate::runtime_env::RuntimeEnv`] for more details
235pub trait FileMetadata: Any + Send + Sync {
236    /// Returns the file metadata as [`Any`] so that it can be downcast to a specific
237    /// implementation.
238    fn as_any(&self) -> &dyn Any;
239
240    /// Returns the size of the metadata in bytes.
241    fn memory_size(&self) -> usize;
242
243    /// Returns extra information about this entry (used by [`FileMetadataCache::list_entries`]).
244    fn extra_info(&self) -> HashMap<String, String>;
245}
246
247/// Cached file metadata entry with validation information.
248#[derive(Clone)]
249pub struct CachedFileMetadataEntry {
250    /// File metadata used for cache validation (size, last_modified).
251    pub meta: ObjectMeta,
252    /// The cached file metadata.
253    pub file_metadata: Arc<dyn FileMetadata>,
254}
255
256impl CachedFileMetadataEntry {
257    /// Create a new cached file metadata entry.
258    pub fn new(meta: ObjectMeta, file_metadata: Arc<dyn FileMetadata>) -> Self {
259        Self {
260            meta,
261            file_metadata,
262        }
263    }
264
265    /// Check if this cached entry is still valid for the given metadata.
266    pub fn is_valid_for(&self, current_meta: &ObjectMeta) -> bool {
267        self.meta.size == current_meta.size
268            && self.meta.last_modified == current_meta.last_modified
269    }
270}
271
272impl Debug for CachedFileMetadataEntry {
273    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
274        f.debug_struct("CachedFileMetadataEntry")
275            .field("meta", &self.meta)
276            .field("memory_size", &self.file_metadata.memory_size())
277            .finish()
278    }
279}
280
281/// Cache for file-embedded metadata.
282///
283/// This cache stores per-file metadata in the form of [`CachedFileMetadataEntry`],
284/// which includes the [`ObjectMeta`] for validation.
285///
286/// For example, the built in [`ListingTable`] uses this cache to avoid parsing
287/// Parquet footers multiple times for the same file.
288///
289/// DataFusion provides a default implementation, [`DefaultFilesMetadataCache`],
290/// and users can also provide their own implementations to implement custom
291/// caching strategies.
292///
293/// The typical usage pattern is:
294/// 1. Call `get(path)` to check for cached value
295/// 2. If `Some(cached)`, validate with `cached.is_valid_for(&current_meta)`
296/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
297///
298/// See [`crate::runtime_env::RuntimeEnv`] for more details.
299///
300/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
301pub trait FileMetadataCache: CacheAccessor<Path, CachedFileMetadataEntry> {
302    /// Returns the cache's memory limit in bytes.
303    fn cache_limit(&self) -> usize;
304
305    /// Updates the cache with a new memory limit in bytes.
306    fn update_cache_limit(&self, limit: usize);
307
308    /// Retrieves the information about the entries currently cached.
309    fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry>;
310}
311
312#[derive(Debug, Clone, PartialEq, Eq)]
313/// Represents information about a cached metadata entry.
314/// This is used to expose the metadata cache contents to outside modules.
315pub struct FileMetadataCacheEntry {
316    pub object_meta: ObjectMeta,
317    /// Size of the cached metadata, in bytes.
318    pub size_bytes: usize,
319    /// Number of times this entry was retrieved.
320    pub hits: usize,
321    /// Additional object-specific information.
322    pub extra: HashMap<String, String>,
323}
324
325impl Debug for dyn FileStatisticsCache {
326    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
327        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
328    }
329}
330
331impl Debug for dyn ListFilesCache {
332    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
333        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
334    }
335}
336
337impl Debug for dyn FileMetadataCache {
338    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
339        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
340    }
341}
342
343/// Manages various caches used in DataFusion.
344///
345/// Following DataFusion design principles, DataFusion provides default cache
346/// implementations, while also allowing users to provide their own custom cache
347/// implementations by implementing the relevant traits.
348///
349/// See [`CacheManagerConfig`] for configuration options.
350#[derive(Debug)]
351pub struct CacheManager {
352    file_statistic_cache: Option<Arc<dyn FileStatisticsCache>>,
353    list_files_cache: Option<Arc<dyn ListFilesCache>>,
354    file_metadata_cache: Arc<dyn FileMetadataCache>,
355}
356
357impl CacheManager {
358    pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
359        let file_statistic_cache = match &config.file_statistics_cache {
360            Some(fsc) if config.file_statistics_cache_limit > 0 => {
361                fsc.update_cache_limit(config.file_statistics_cache_limit);
362                Some(Arc::clone(fsc))
363            }
364            None if config.file_statistics_cache_limit > 0 => {
365                let fsc: Arc<dyn FileStatisticsCache> = Arc::new(
366                    DefaultFileStatisticsCache::new(config.file_statistics_cache_limit),
367                );
368                Some(fsc)
369            }
370            _ => None,
371        };
372
373        let list_files_cache = match &config.list_files_cache {
374            Some(lfc) if config.list_files_cache_limit > 0 => {
375                // the cache memory limit or ttl might have changed, ensure they are updated
376                lfc.update_cache_limit(config.list_files_cache_limit);
377                // Only update TTL if explicitly set in config, otherwise preserve the cache's existing TTL
378                if let Some(ttl) = config.list_files_cache_ttl {
379                    lfc.update_cache_ttl(Some(ttl));
380                }
381                Some(Arc::clone(lfc))
382            }
383            None if config.list_files_cache_limit > 0 => {
384                let lfc: Arc<dyn ListFilesCache> = Arc::new(DefaultListFilesCache::new(
385                    config.list_files_cache_limit,
386                    config.list_files_cache_ttl,
387                ));
388                Some(lfc)
389            }
390            _ => None,
391        };
392
393        let file_metadata_cache = config
394            .file_metadata_cache
395            .as_ref()
396            .map(Arc::clone)
397            .unwrap_or_else(|| {
398                Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
399            });
400
401        // the cache memory limit might have changed, ensure the limit is updated
402        file_metadata_cache.update_cache_limit(config.metadata_cache_limit);
403
404        Ok(Arc::new(CacheManager {
405            file_statistic_cache,
406            list_files_cache,
407            file_metadata_cache,
408        }))
409    }
410
411    /// Get the file statistics cache.
412    pub fn get_file_statistic_cache(&self) -> Option<Arc<dyn FileStatisticsCache>> {
413        self.file_statistic_cache.clone()
414    }
415
416    /// Get the memory limit of the file statistics cache.
417    pub fn get_file_statistic_cache_limit(&self) -> usize {
418        self.file_statistic_cache
419            .as_ref()
420            .map_or(0, |c| c.cache_limit())
421    }
422
423    /// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path.
424    pub fn get_list_files_cache(&self) -> Option<Arc<dyn ListFilesCache>> {
425        self.list_files_cache.clone()
426    }
427
428    /// Get the memory limit of the list files cache.
429    pub fn get_list_files_cache_limit(&self) -> usize {
430        self.list_files_cache
431            .as_ref()
432            .map_or(0, |c| c.cache_limit())
433    }
434
435    /// Get the TTL (time-to-live) of the list files cache.
436    pub fn get_list_files_cache_ttl(&self) -> Option<Duration> {
437        self.list_files_cache.as_ref().and_then(|c| c.cache_ttl())
438    }
439
440    /// Get the file embedded metadata cache.
441    pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
442        Arc::clone(&self.file_metadata_cache)
443    }
444
445    /// Get the limit of the file embedded metadata cache.
446    pub fn get_metadata_cache_limit(&self) -> usize {
447        self.file_metadata_cache.cache_limit()
448    }
449}
450
451pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
452
453#[derive(Clone)]
454pub struct CacheManagerConfig {
455    /// Enable caching of file statistics when listing files.
456    /// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session.
457    /// Default is enabled. Currently only Parquet files are supported.
458    pub file_statistics_cache: Option<Arc<dyn FileStatisticsCache>>,
459    /// Limit of the file statistics cache, in bytes. Default: 20MiB.
460    pub file_statistics_cache_limit: usize,
461    /// Enable caching of file metadata when listing files.
462    /// Enabling the cache avoids repeat list and object metadata fetch operations, which may be
463    /// expensive in certain situations (e.g. remote object storage), for objects under paths that
464    /// are cached.
465    /// Note that if this option is enabled, DataFusion will not see any updates to the underlying
466    /// storage for at least `list_files_cache_ttl` duration.
467    /// Default is enabled.
468    pub list_files_cache: Option<Arc<dyn ListFilesCache>>,
469    /// Limit of the `list_files_cache`, in bytes. Default: 1MiB.
470    pub list_files_cache_limit: usize,
471    /// The duration the list files cache will consider an entry valid after insertion. Note that
472    /// changes to the underlying storage system, such as adding or removing data, will not be
473    /// visible until an entry expires. Default: None (infinite).
474    pub list_files_cache_ttl: Option<Duration>,
475    /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
476    /// data file (e.g., Parquet footer and page metadata).
477    /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
478    pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
479    /// Limit of the file-embedded metadata cache, in bytes.
480    pub metadata_cache_limit: usize,
481}
482
483impl Default for CacheManagerConfig {
484    fn default() -> Self {
485        Self {
486            file_statistics_cache: Default::default(),
487            file_statistics_cache_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT,
488            list_files_cache: Default::default(),
489            list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
490            list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL,
491            file_metadata_cache: Default::default(),
492            metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
493        }
494    }
495}
496
497impl CacheManagerConfig {
498    /// Set the cache for file statistics.
499    pub fn with_file_statistics_cache(
500        mut self,
501        cache: Option<Arc<dyn FileStatisticsCache>>,
502    ) -> Self {
503        self.file_statistics_cache = cache;
504        self
505    }
506
507    /// Specifies the memory limit for the file statistics cache, in bytes.
508    pub fn with_file_statistics_cache_limit(mut self, limit: usize) -> Self {
509        self.file_statistics_cache_limit = limit;
510        self
511    }
512
513    /// Set the cache for listing files.
514    ///
515    /// Default is `None` (disabled).
516    pub fn with_list_files_cache(
517        mut self,
518        cache: Option<Arc<dyn ListFilesCache>>,
519    ) -> Self {
520        self.list_files_cache = cache;
521        self
522    }
523
524    /// Sets the limit of the list files cache, in bytes.
525    ///
526    /// Default: 1MiB (1,048,576 bytes).
527    pub fn with_list_files_cache_limit(mut self, limit: usize) -> Self {
528        self.list_files_cache_limit = limit;
529        self
530    }
531
532    /// Sets the TTL (time-to-live) for entries in the list files cache.
533    ///
534    /// Default: None (infinite).
535    pub fn with_list_files_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
536        self.list_files_cache_ttl = ttl;
537        self
538    }
539
540    /// Sets the cache for file-embedded metadata.
541    ///
542    /// Default is a [`DefaultFilesMetadataCache`].
543    pub fn with_file_metadata_cache(
544        mut self,
545        cache: Option<Arc<dyn FileMetadataCache>>,
546    ) -> Self {
547        self.file_metadata_cache = cache;
548        self
549    }
550
551    /// Sets the limit of the file-embedded metadata cache, in bytes.
552    pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
553        self.metadata_cache_limit = limit;
554        self
555    }
556}
557
558#[cfg(test)]
559mod tests {
560    use super::*;
561
562    /// Test to verify that TTL is preserved when not explicitly set in config.
563    /// This fixes issue #19396 where TTL was being unset from DefaultListFilesCache
564    /// when CacheManagerConfig::list_files_cache_ttl was not set explicitly.
565    #[test]
566    fn test_ttl_preserved_when_not_set_in_config() {
567        // Create a cache with TTL = 1 second
568        let list_file_cache =
569            DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
570
571        // Verify the cache has TTL set initially
572        assert_eq!(
573            list_file_cache.cache_ttl(),
574            Some(Duration::from_secs(1)),
575            "Cache should have TTL = 1 second initially"
576        );
577
578        // Put cache in config WITHOUT setting list_files_cache_ttl
579        let config = CacheManagerConfig::default()
580            .with_list_files_cache(Some(Arc::new(list_file_cache)));
581
582        // Create CacheManager from config
583        let cache_manager = CacheManager::try_new(&config).unwrap();
584
585        // Verify TTL is preserved (not unset)
586        let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
587
588        assert!(
589            cache_ttl.is_some(),
590            "TTL should be preserved when not set in config. Expected Some(Duration::from_secs(1)), got {cache_ttl:?}"
591        );
592
593        // Verify it's the correct TTL value
594        assert_eq!(
595            cache_ttl,
596            Some(Duration::from_secs(1)),
597            "TTL should be exactly 1 second"
598        );
599    }
600
601    /// Test to verify that TTL can still be overridden when explicitly set in config.
602    #[test]
603    fn test_ttl_overridden_when_set_in_config() {
604        // Create a cache with TTL = 1 second
605        let list_file_cache =
606            DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
607
608        // Put cache in config WITH a different TTL set
609        let config = CacheManagerConfig::default()
610            .with_list_files_cache(Some(Arc::new(list_file_cache)))
611            .with_list_files_cache_ttl(Some(Duration::from_secs(60)));
612
613        // Create CacheManager from config
614        let cache_manager = CacheManager::try_new(&config).unwrap();
615
616        // Verify TTL is overridden to the config value
617        let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
618
619        assert_eq!(
620            cache_ttl,
621            Some(Duration::from_secs(60)),
622            "TTL should be overridden to 60 seconds when set in config"
623        );
624    }
625}