Skip to main content

datafusion_execution/cache/
cache_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::cache::CacheAccessor;
19use crate::cache::DefaultListFilesCache;
20use crate::cache::cache_unit::DefaultFilesMetadataCache;
21use crate::cache::list_files_cache::ListFilesEntry;
22use crate::cache::list_files_cache::TableScopedPath;
23use datafusion_common::TableReference;
24use datafusion_common::stats::Precision;
25use datafusion_common::{Result, Statistics};
26use datafusion_physical_expr_common::sort_expr::LexOrdering;
27use object_store::ObjectMeta;
28use object_store::path::Path;
29use std::any::Any;
30use std::collections::HashMap;
31use std::fmt::{Debug, Formatter};
32use std::ops::Deref;
33use std::sync::Arc;
34use std::time::Duration;
35
36pub use super::list_files_cache::{
37    DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
38};
39
40/// Cached metadata for a file, including statistics and ordering.
41///
42/// This struct embeds the [`ObjectMeta`] used for cache validation,
43/// along with the cached statistics and ordering information.
44#[derive(Debug, Clone)]
45pub struct CachedFileMetadata {
46    /// File metadata used for cache validation (size, last_modified).
47    pub meta: ObjectMeta,
48    /// Cached statistics for the file, if available.
49    pub statistics: Arc<Statistics>,
50    /// Cached ordering for the file.
51    pub ordering: Option<LexOrdering>,
52}
53
54impl CachedFileMetadata {
55    /// Create a new cached file metadata entry.
56    pub fn new(
57        meta: ObjectMeta,
58        statistics: Arc<Statistics>,
59        ordering: Option<LexOrdering>,
60    ) -> Self {
61        Self {
62            meta,
63            statistics,
64            ordering,
65        }
66    }
67
68    /// Check if this cached entry is still valid for the given metadata.
69    ///
70    /// Returns true if the file size and last modified time match.
71    pub fn is_valid_for(&self, current_meta: &ObjectMeta) -> bool {
72        self.meta.size == current_meta.size
73            && self.meta.last_modified == current_meta.last_modified
74    }
75}
76
77/// A cache for file statistics and orderings.
78///
79/// This cache stores [`CachedFileMetadata`] which includes:
80/// - File metadata for validation (size, last_modified)
81/// - Statistics for the file
82/// - Ordering information for the file
83///
84/// If enabled via [`CacheManagerConfig::with_files_statistics_cache`] this
85/// cache avoids inferring the same file statistics repeatedly during the
86/// session lifetime.
87///
88/// The typical usage pattern is:
89/// 1. Call `get(path)` to check for cached value
90/// 2. If `Some(cached)`, validate with `cached.is_valid_for(&current_meta)`
91/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
92///
93/// See [`crate::runtime_env::RuntimeEnv`] for more details
94pub trait FileStatisticsCache: CacheAccessor<Path, CachedFileMetadata> {
95    /// Retrieves the information about the entries currently cached.
96    fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry>;
97}
98
99/// Represents information about a cached statistics entry.
100/// This is used to expose the statistics cache contents to outside modules.
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct FileStatisticsCacheEntry {
103    pub object_meta: ObjectMeta,
104    /// Number of table rows.
105    pub num_rows: Precision<usize>,
106    /// Number of table columns.
107    pub num_columns: usize,
108    /// Total table size, in bytes.
109    pub table_size_bytes: Precision<usize>,
110    /// Size of the statistics entry, in bytes.
111    pub statistics_size_bytes: usize,
112    /// Whether ordering information is cached for this file.
113    pub has_ordering: bool,
114}
115
116/// Cached file listing.
117///
118/// TTL expiration is handled internally by the cache implementation.
119#[derive(Debug, Clone, PartialEq)]
120pub struct CachedFileList {
121    /// The cached file list.
122    pub files: Arc<Vec<ObjectMeta>>,
123}
124
125impl CachedFileList {
126    /// Create a new cached file list.
127    pub fn new(files: Vec<ObjectMeta>) -> Self {
128        Self {
129            files: Arc::new(files),
130        }
131    }
132
133    /// Filter the files by prefix.
134    fn filter_by_prefix(&self, prefix: &Option<Path>) -> Vec<ObjectMeta> {
135        match prefix {
136            Some(prefix) => self
137                .files
138                .iter()
139                .filter(|meta| meta.location.as_ref().starts_with(prefix.as_ref()))
140                .cloned()
141                .collect(),
142            None => self.files.as_ref().clone(),
143        }
144    }
145
146    /// Returns files matching the given prefix.
147    ///
148    /// When prefix is `None`, returns a clone of the `Arc` (no data copy).
149    /// When filtering is needed, returns a new `Arc` with filtered results (clones each matching [`ObjectMeta`]).
150    pub fn files_matching_prefix(&self, prefix: &Option<Path>) -> Arc<Vec<ObjectMeta>> {
151        match prefix {
152            None => Arc::clone(&self.files),
153            Some(p) => Arc::new(self.filter_by_prefix(&Some(p.clone()))),
154        }
155    }
156}
157
158impl Deref for CachedFileList {
159    type Target = Arc<Vec<ObjectMeta>>;
160    fn deref(&self) -> &Self::Target {
161        &self.files
162    }
163}
164
165impl From<Vec<ObjectMeta>> for CachedFileList {
166    fn from(files: Vec<ObjectMeta>) -> Self {
167        Self::new(files)
168    }
169}
170
171/// Cache for storing the [`ObjectMeta`]s that result from listing a path
172///
173/// Listing a path means doing an object store "list" operation or `ls`
174/// command on the local filesystem. This operation can be expensive,
175/// especially when done over remote object stores.
176///
177/// The cache key is always the table's base path, ensuring a stable cache key.
178/// The cached value is a [`CachedFileList`] containing the files and a timestamp.
179///
180/// Partition filtering is done after retrieval using [`CachedFileList::files_matching_prefix`].
181///
182/// See [`crate::runtime_env::RuntimeEnv`] for more details.
183pub trait ListFilesCache: CacheAccessor<TableScopedPath, CachedFileList> {
184    /// Returns the cache's memory limit in bytes.
185    fn cache_limit(&self) -> usize;
186
187    /// Returns the TTL (time-to-live) for cache entries, if configured.
188    fn cache_ttl(&self) -> Option<Duration>;
189
190    /// Updates the cache with a new memory limit in bytes.
191    fn update_cache_limit(&self, limit: usize);
192
193    /// Updates the cache with a new TTL (time-to-live).
194    fn update_cache_ttl(&self, ttl: Option<Duration>);
195
196    /// Retrieves the information about the entries currently cached.
197    fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry>;
198
199    /// Drop all entries for the given table reference.
200    fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> Result<()>;
201}
202
203/// Generic file-embedded metadata used with [`FileMetadataCache`].
204///
205/// For example, Parquet footers and page metadata can be represented
206/// using this trait.
207///
208/// See [`crate::runtime_env::RuntimeEnv`] for more details
209pub trait FileMetadata: Any + Send + Sync {
210    /// Returns the file metadata as [`Any`] so that it can be downcast to a specific
211    /// implementation.
212    fn as_any(&self) -> &dyn Any;
213
214    /// Returns the size of the metadata in bytes.
215    fn memory_size(&self) -> usize;
216
217    /// Returns extra information about this entry (used by [`FileMetadataCache::list_entries`]).
218    fn extra_info(&self) -> HashMap<String, String>;
219}
220
221/// Cached file metadata entry with validation information.
222#[derive(Clone)]
223pub struct CachedFileMetadataEntry {
224    /// File metadata used for cache validation (size, last_modified).
225    pub meta: ObjectMeta,
226    /// The cached file metadata.
227    pub file_metadata: Arc<dyn FileMetadata>,
228}
229
230impl CachedFileMetadataEntry {
231    /// Create a new cached file metadata entry.
232    pub fn new(meta: ObjectMeta, file_metadata: Arc<dyn FileMetadata>) -> Self {
233        Self {
234            meta,
235            file_metadata,
236        }
237    }
238
239    /// Check if this cached entry is still valid for the given metadata.
240    pub fn is_valid_for(&self, current_meta: &ObjectMeta) -> bool {
241        self.meta.size == current_meta.size
242            && self.meta.last_modified == current_meta.last_modified
243    }
244}
245
246impl Debug for CachedFileMetadataEntry {
247    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
248        f.debug_struct("CachedFileMetadataEntry")
249            .field("meta", &self.meta)
250            .field("memory_size", &self.file_metadata.memory_size())
251            .finish()
252    }
253}
254
255/// Cache for file-embedded metadata.
256///
257/// This cache stores per-file metadata in the form of [`CachedFileMetadataEntry`],
258/// which includes the [`ObjectMeta`] for validation.
259///
260/// For example, the built in [`ListingTable`] uses this cache to avoid parsing
261/// Parquet footers multiple times for the same file.
262///
263/// DataFusion provides a default implementation, [`DefaultFilesMetadataCache`],
264/// and users can also provide their own implementations to implement custom
265/// caching strategies.
266///
267/// The typical usage pattern is:
268/// 1. Call `get(path)` to check for cached value
269/// 2. If `Some(cached)`, validate with `cached.is_valid_for(&current_meta)`
270/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
271///
272/// See [`crate::runtime_env::RuntimeEnv`] for more details.
273///
274/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
275pub trait FileMetadataCache: CacheAccessor<Path, CachedFileMetadataEntry> {
276    /// Returns the cache's memory limit in bytes.
277    fn cache_limit(&self) -> usize;
278
279    /// Updates the cache with a new memory limit in bytes.
280    fn update_cache_limit(&self, limit: usize);
281
282    /// Retrieves the information about the entries currently cached.
283    fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry>;
284}
285
286#[derive(Debug, Clone, PartialEq, Eq)]
287/// Represents information about a cached metadata entry.
288/// This is used to expose the metadata cache contents to outside modules.
289pub struct FileMetadataCacheEntry {
290    pub object_meta: ObjectMeta,
291    /// Size of the cached metadata, in bytes.
292    pub size_bytes: usize,
293    /// Number of times this entry was retrieved.
294    pub hits: usize,
295    /// Additional object-specific information.
296    pub extra: HashMap<String, String>,
297}
298
299impl Debug for dyn FileStatisticsCache {
300    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
301        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
302    }
303}
304
305impl Debug for dyn ListFilesCache {
306    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
307        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
308    }
309}
310
311impl Debug for dyn FileMetadataCache {
312    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
313        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
314    }
315}
316
317/// Manages various caches used in DataFusion.
318///
319/// Following DataFusion design principles, DataFusion provides default cache
320/// implementations, while also allowing users to provide their own custom cache
321/// implementations by implementing the relevant traits.
322///
323/// See [`CacheManagerConfig`] for configuration options.
324#[derive(Debug)]
325pub struct CacheManager {
326    file_statistic_cache: Option<Arc<dyn FileStatisticsCache>>,
327    list_files_cache: Option<Arc<dyn ListFilesCache>>,
328    file_metadata_cache: Arc<dyn FileMetadataCache>,
329}
330
331impl CacheManager {
332    pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
333        let file_statistic_cache =
334            config.table_files_statistics_cache.as_ref().map(Arc::clone);
335
336        let list_files_cache = match &config.list_files_cache {
337            Some(lfc) if config.list_files_cache_limit > 0 => {
338                // the cache memory limit or ttl might have changed, ensure they are updated
339                lfc.update_cache_limit(config.list_files_cache_limit);
340                // Only update TTL if explicitly set in config, otherwise preserve the cache's existing TTL
341                if let Some(ttl) = config.list_files_cache_ttl {
342                    lfc.update_cache_ttl(Some(ttl));
343                }
344                Some(Arc::clone(lfc))
345            }
346            None if config.list_files_cache_limit > 0 => {
347                let lfc: Arc<dyn ListFilesCache> = Arc::new(DefaultListFilesCache::new(
348                    config.list_files_cache_limit,
349                    config.list_files_cache_ttl,
350                ));
351                Some(lfc)
352            }
353            _ => None,
354        };
355
356        let file_metadata_cache = config
357            .file_metadata_cache
358            .as_ref()
359            .map(Arc::clone)
360            .unwrap_or_else(|| {
361                Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
362            });
363
364        // the cache memory limit might have changed, ensure the limit is updated
365        file_metadata_cache.update_cache_limit(config.metadata_cache_limit);
366
367        Ok(Arc::new(CacheManager {
368            file_statistic_cache,
369            list_files_cache,
370            file_metadata_cache,
371        }))
372    }
373
374    /// Get the cache of listing files statistics.
375    pub fn get_file_statistic_cache(&self) -> Option<Arc<dyn FileStatisticsCache>> {
376        self.file_statistic_cache.clone()
377    }
378
379    /// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path.
380    pub fn get_list_files_cache(&self) -> Option<Arc<dyn ListFilesCache>> {
381        self.list_files_cache.clone()
382    }
383
384    /// Get the memory limit of the list files cache.
385    pub fn get_list_files_cache_limit(&self) -> usize {
386        self.list_files_cache
387            .as_ref()
388            .map_or(0, |c| c.cache_limit())
389    }
390
391    /// Get the TTL (time-to-live) of the list files cache.
392    pub fn get_list_files_cache_ttl(&self) -> Option<Duration> {
393        self.list_files_cache.as_ref().and_then(|c| c.cache_ttl())
394    }
395
396    /// Get the file embedded metadata cache.
397    pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
398        Arc::clone(&self.file_metadata_cache)
399    }
400
401    /// Get the limit of the file embedded metadata cache.
402    pub fn get_metadata_cache_limit(&self) -> usize {
403        self.file_metadata_cache.cache_limit()
404    }
405}
406
407pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
408
409#[derive(Clone)]
410pub struct CacheManagerConfig {
411    /// Enable caching of file statistics when listing files.
412    /// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session.
413    /// Default is disabled. Currently only Parquet files are supported.
414    pub table_files_statistics_cache: Option<Arc<dyn FileStatisticsCache>>,
415    /// Enable caching of file metadata when listing files.
416    /// Enabling the cache avoids repeat list and object metadata fetch operations, which may be
417    /// expensive in certain situations (e.g. remote object storage), for objects under paths that
418    /// are cached.
419    /// Note that if this option is enabled, DataFusion will not see any updates to the underlying
420    /// storage for at least `list_files_cache_ttl` duration.
421    /// Default is disabled.
422    pub list_files_cache: Option<Arc<dyn ListFilesCache>>,
423    /// Limit of the `list_files_cache`, in bytes. Default: 1MiB.
424    pub list_files_cache_limit: usize,
425    /// The duration the list files cache will consider an entry valid after insertion. Note that
426    /// changes to the underlying storage system, such as adding or removing data, will not be
427    /// visible until an entry expires. Default: None (infinite).
428    pub list_files_cache_ttl: Option<Duration>,
429    /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
430    /// data file (e.g., Parquet footer and page metadata).
431    /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
432    pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
433    /// Limit of the file-embedded metadata cache, in bytes.
434    pub metadata_cache_limit: usize,
435}
436
437impl Default for CacheManagerConfig {
438    fn default() -> Self {
439        Self {
440            table_files_statistics_cache: Default::default(),
441            list_files_cache: Default::default(),
442            list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
443            list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL,
444            file_metadata_cache: Default::default(),
445            metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
446        }
447    }
448}
449
450impl CacheManagerConfig {
451    /// Set the cache for files statistics.
452    ///
453    /// Default is `None` (disabled).
454    pub fn with_files_statistics_cache(
455        mut self,
456        cache: Option<Arc<dyn FileStatisticsCache>>,
457    ) -> Self {
458        self.table_files_statistics_cache = cache;
459        self
460    }
461
462    /// Set the cache for listing files.
463    ///
464    /// Default is `None` (disabled).
465    pub fn with_list_files_cache(
466        mut self,
467        cache: Option<Arc<dyn ListFilesCache>>,
468    ) -> Self {
469        self.list_files_cache = cache;
470        self
471    }
472
473    /// Sets the limit of the list files cache, in bytes.
474    ///
475    /// Default: 1MiB (1,048,576 bytes).
476    pub fn with_list_files_cache_limit(mut self, limit: usize) -> Self {
477        self.list_files_cache_limit = limit;
478        self
479    }
480
481    /// Sets the TTL (time-to-live) for entries in the list files cache.
482    ///
483    /// Default: None (infinite).
484    pub fn with_list_files_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
485        self.list_files_cache_ttl = ttl;
486        self
487    }
488
489    /// Sets the cache for file-embedded metadata.
490    ///
491    /// Default is a [`DefaultFilesMetadataCache`].
492    pub fn with_file_metadata_cache(
493        mut self,
494        cache: Option<Arc<dyn FileMetadataCache>>,
495    ) -> Self {
496        self.file_metadata_cache = cache;
497        self
498    }
499
500    /// Sets the limit of the file-embedded metadata cache, in bytes.
501    pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
502        self.metadata_cache_limit = limit;
503        self
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510    use crate::cache::DefaultListFilesCache;
511
512    /// Test to verify that TTL is preserved when not explicitly set in config.
513    /// This fixes issue #19396 where TTL was being unset from DefaultListFilesCache
514    /// when CacheManagerConfig::list_files_cache_ttl was not set explicitly.
515    #[test]
516    fn test_ttl_preserved_when_not_set_in_config() {
517        use std::time::Duration;
518
519        // Create a cache with TTL = 1 second
520        let list_file_cache =
521            DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
522
523        // Verify the cache has TTL set initially
524        assert_eq!(
525            list_file_cache.cache_ttl(),
526            Some(Duration::from_secs(1)),
527            "Cache should have TTL = 1 second initially"
528        );
529
530        // Put cache in config WITHOUT setting list_files_cache_ttl
531        let config = CacheManagerConfig::default()
532            .with_list_files_cache(Some(Arc::new(list_file_cache)));
533
534        // Create CacheManager from config
535        let cache_manager = CacheManager::try_new(&config).unwrap();
536
537        // Verify TTL is preserved (not unset)
538        let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
539
540        assert!(
541            cache_ttl.is_some(),
542            "TTL should be preserved when not set in config. Expected Some(Duration::from_secs(1)), got {cache_ttl:?}"
543        );
544
545        // Verify it's the correct TTL value
546        assert_eq!(
547            cache_ttl,
548            Some(Duration::from_secs(1)),
549            "TTL should be exactly 1 second"
550        );
551    }
552
553    /// Test to verify that TTL can still be overridden when explicitly set in config.
554    #[test]
555    fn test_ttl_overridden_when_set_in_config() {
556        use std::time::Duration;
557
558        // Create a cache with TTL = 1 second
559        let list_file_cache =
560            DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
561
562        // Put cache in config WITH a different TTL set
563        let config = CacheManagerConfig::default()
564            .with_list_files_cache(Some(Arc::new(list_file_cache)))
565            .with_list_files_cache_ttl(Some(Duration::from_secs(60)));
566
567        // Create CacheManager from config
568        let cache_manager = CacheManager::try_new(&config).unwrap();
569
570        // Verify TTL is overridden to the config value
571        let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
572
573        assert_eq!(
574            cache_ttl,
575            Some(Duration::from_secs(60)),
576            "TTL should be overridden to 60 seconds when set in config"
577        );
578    }
579}