datafusion_execution/cache/
cache_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::cache::cache_unit::DefaultFilesMetadataCache;
19use crate::cache::list_files_cache::ListFilesEntry;
20use crate::cache::list_files_cache::TableScopedPath;
21use crate::cache::{CacheAccessor, DefaultListFilesCache};
22use datafusion_common::TableReference;
23use datafusion_common::stats::Precision;
24use datafusion_common::{Result, Statistics};
25use object_store::ObjectMeta;
26use object_store::path::Path;
27use std::any::Any;
28use std::collections::HashMap;
29use std::fmt::{Debug, Formatter};
30use std::sync::Arc;
31use std::time::Duration;
32
33pub use super::list_files_cache::{
34    DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
35};
36
37/// A cache for [`Statistics`].
38///
39/// If enabled via [`CacheManagerConfig::with_files_statistics_cache`] this
40/// cache avoids inferring the same file statistics repeatedly during the
41/// session lifetime.
42///
43/// See [`crate::runtime_env::RuntimeEnv`] for more details
44pub trait FileStatisticsCache:
45    CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>
46{
47    /// Retrieves the information about the entries currently cached.
48    fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry>;
49}
50
51/// Represents information about a cached statistics entry.
52/// This is used to expose the statistics cache contents to outside modules.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct FileStatisticsCacheEntry {
55    pub object_meta: ObjectMeta,
56    /// Number of table rows.
57    pub num_rows: Precision<usize>,
58    /// Number of table columns.
59    pub num_columns: usize,
60    /// Total table size, in bytes.
61    pub table_size_bytes: Precision<usize>,
62    /// Size of the statistics entry, in bytes.
63    pub statistics_size_bytes: usize,
64}
65
66/// Cache for storing the [`ObjectMeta`]s that result from listing a path
67///
68/// Listing a path means doing an object store "list" operation or `ls`
69/// command on the local filesystem. This operation can be expensive,
70/// especially when done over remote object stores.
71///
72/// The cache key is always the table's base path, ensuring a stable cache key.
73/// The `Extra` type is `Option<Path>`, representing an optional prefix filter
74/// (relative to the table base path) for partition-aware lookups.
75///
76/// When `get_with_extra(key, Some(prefix))` is called:
77/// - The cache entry for `key` (table base path) is fetched
78/// - Results are filtered to only include files matching `key/prefix`
79/// - Filtered results are returned without making a storage call
80///
81/// This enables efficient partition pruning: a single cached listing of the
82/// full table can serve queries for any partition subset.
83///
84/// See [`crate::runtime_env::RuntimeEnv`] for more details.
85pub trait ListFilesCache:
86    CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
87{
88    /// Returns the cache's memory limit in bytes.
89    fn cache_limit(&self) -> usize;
90
91    /// Returns the TTL (time-to-live) for cache entries, if configured.
92    fn cache_ttl(&self) -> Option<Duration>;
93
94    /// Updates the cache with a new memory limit in bytes.
95    fn update_cache_limit(&self, limit: usize);
96
97    /// Updates the cache with a new TTL (time-to-live).
98    fn update_cache_ttl(&self, ttl: Option<Duration>);
99
100    /// Retrieves the information about the entries currently cached.
101    fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry>;
102
103    fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> Result<()>;
104}
105
106/// Generic file-embedded metadata used with [`FileMetadataCache`].
107///
108/// For example, Parquet footers and page metadata can be represented
109/// using this trait.
110///
111/// See [`crate::runtime_env::RuntimeEnv`] for more details
112pub trait FileMetadata: Any + Send + Sync {
113    /// Returns the file metadata as [`Any`] so that it can be downcast to a specific
114    /// implementation.
115    fn as_any(&self) -> &dyn Any;
116
117    /// Returns the size of the metadata in bytes.
118    fn memory_size(&self) -> usize;
119
120    /// Returns extra information about this entry (used by [`FileMetadataCache::list_entries`]).
121    fn extra_info(&self) -> HashMap<String, String>;
122}
123
124/// Cache for file-embedded metadata.
125///
126/// This cache stores per-file metadata in the form of [`FileMetadata`],
127///
128/// For example, the built in [`ListingTable`] uses this cache to avoid parsing
129/// Parquet footers multiple times for the same file.
130///
131/// DataFusion provides a default implementation, [`DefaultFilesMetadataCache`],
132/// and users can also provide their own implementations to implement custom
133/// caching strategies.
134///
135/// See [`crate::runtime_env::RuntimeEnv`] for more details.
136///
137/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
138pub trait FileMetadataCache:
139    CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
140{
141    /// Returns the cache's memory limit in bytes.
142    fn cache_limit(&self) -> usize;
143
144    /// Updates the cache with a new memory limit in bytes.
145    fn update_cache_limit(&self, limit: usize);
146
147    /// Retrieves the information about the entries currently cached.
148    fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry>;
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
152/// Represents information about a cached metadata entry.
153/// This is used to expose the metadata cache contents to outside modules.
154pub struct FileMetadataCacheEntry {
155    pub object_meta: ObjectMeta,
156    /// Size of the cached metadata, in bytes.
157    pub size_bytes: usize,
158    /// Number of times this entry was retrieved.
159    pub hits: usize,
160    /// Additional object-specific information.
161    pub extra: HashMap<String, String>,
162}
163
164impl Debug for dyn FileStatisticsCache {
165    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
166        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
167    }
168}
169
170impl Debug for dyn ListFilesCache {
171    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
173    }
174}
175
176impl Debug for dyn FileMetadataCache {
177    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
179    }
180}
181
182/// Manages various caches used in DataFusion.
183///
184/// Following DataFusion design principles, DataFusion provides default cache
185/// implementations, while also allowing users to provide their own custom cache
186/// implementations by implementing the relevant traits.
187///
188/// See [`CacheManagerConfig`] for configuration options.
189#[derive(Debug)]
190pub struct CacheManager {
191    file_statistic_cache: Option<Arc<dyn FileStatisticsCache>>,
192    list_files_cache: Option<Arc<dyn ListFilesCache>>,
193    file_metadata_cache: Arc<dyn FileMetadataCache>,
194}
195
196impl CacheManager {
197    pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
198        let file_statistic_cache =
199            config.table_files_statistics_cache.as_ref().map(Arc::clone);
200
201        let list_files_cache = match &config.list_files_cache {
202            Some(lfc) if config.list_files_cache_limit > 0 => {
203                // the cache memory limit or ttl might have changed, ensure they are updated
204                lfc.update_cache_limit(config.list_files_cache_limit);
205                // Only update TTL if explicitly set in config, otherwise preserve the cache's existing TTL
206                if let Some(ttl) = config.list_files_cache_ttl {
207                    lfc.update_cache_ttl(Some(ttl));
208                }
209                Some(Arc::clone(lfc))
210            }
211            None if config.list_files_cache_limit > 0 => {
212                let lfc: Arc<dyn ListFilesCache> = Arc::new(DefaultListFilesCache::new(
213                    config.list_files_cache_limit,
214                    config.list_files_cache_ttl,
215                ));
216                Some(lfc)
217            }
218            _ => None,
219        };
220
221        let file_metadata_cache = config
222            .file_metadata_cache
223            .as_ref()
224            .map(Arc::clone)
225            .unwrap_or_else(|| {
226                Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
227            });
228
229        // the cache memory limit might have changed, ensure the limit is updated
230        file_metadata_cache.update_cache_limit(config.metadata_cache_limit);
231
232        Ok(Arc::new(CacheManager {
233            file_statistic_cache,
234            list_files_cache,
235            file_metadata_cache,
236        }))
237    }
238
239    /// Get the cache of listing files statistics.
240    pub fn get_file_statistic_cache(&self) -> Option<Arc<dyn FileStatisticsCache>> {
241        self.file_statistic_cache.clone()
242    }
243
244    /// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path.
245    pub fn get_list_files_cache(&self) -> Option<Arc<dyn ListFilesCache>> {
246        self.list_files_cache.clone()
247    }
248
249    /// Get the memory limit of the list files cache.
250    pub fn get_list_files_cache_limit(&self) -> usize {
251        self.list_files_cache
252            .as_ref()
253            .map_or(0, |c| c.cache_limit())
254    }
255
256    /// Get the TTL (time-to-live) of the list files cache.
257    pub fn get_list_files_cache_ttl(&self) -> Option<Duration> {
258        self.list_files_cache.as_ref().and_then(|c| c.cache_ttl())
259    }
260
261    /// Get the file embedded metadata cache.
262    pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
263        Arc::clone(&self.file_metadata_cache)
264    }
265
266    /// Get the limit of the file embedded metadata cache.
267    pub fn get_metadata_cache_limit(&self) -> usize {
268        self.file_metadata_cache.cache_limit()
269    }
270}
271
272pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
273
274#[derive(Clone)]
275pub struct CacheManagerConfig {
276    /// Enable caching of file statistics when listing files.
277    /// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session.
278    /// Default is disabled. Currently only Parquet files are supported.
279    pub table_files_statistics_cache: Option<Arc<dyn FileStatisticsCache>>,
280    /// Enable caching of file metadata when listing files.
281    /// Enabling the cache avoids repeat list and object metadata fetch operations, which may be
282    /// expensive in certain situations (e.g. remote object storage), for objects under paths that
283    /// are cached.
284    /// Note that if this option is enabled, DataFusion will not see any updates to the underlying
285    /// storage for at least `list_files_cache_ttl` duration.
286    /// Default is disabled.
287    pub list_files_cache: Option<Arc<dyn ListFilesCache>>,
288    /// Limit of the `list_files_cache`, in bytes. Default: 1MiB.
289    pub list_files_cache_limit: usize,
290    /// The duration the list files cache will consider an entry valid after insertion. Note that
291    /// changes to the underlying storage system, such as adding or removing data, will not be
292    /// visible until an entry expires. Default: None (infinite).
293    pub list_files_cache_ttl: Option<Duration>,
294    /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
295    /// data file (e.g., Parquet footer and page metadata).
296    /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
297    pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
298    /// Limit of the file-embedded metadata cache, in bytes.
299    pub metadata_cache_limit: usize,
300}
301
302impl Default for CacheManagerConfig {
303    fn default() -> Self {
304        Self {
305            table_files_statistics_cache: Default::default(),
306            list_files_cache: Default::default(),
307            list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
308            list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL,
309            file_metadata_cache: Default::default(),
310            metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
311        }
312    }
313}
314
315impl CacheManagerConfig {
316    /// Set the cache for files statistics.
317    ///
318    /// Default is `None` (disabled).
319    pub fn with_files_statistics_cache(
320        mut self,
321        cache: Option<Arc<dyn FileStatisticsCache>>,
322    ) -> Self {
323        self.table_files_statistics_cache = cache;
324        self
325    }
326
327    /// Set the cache for listing files.
328    ///
329    /// Default is `None` (disabled).
330    pub fn with_list_files_cache(
331        mut self,
332        cache: Option<Arc<dyn ListFilesCache>>,
333    ) -> Self {
334        self.list_files_cache = cache;
335        self
336    }
337
338    /// Sets the limit of the list files cache, in bytes.
339    ///
340    /// Default: 1MiB (1,048,576 bytes).
341    pub fn with_list_files_cache_limit(mut self, limit: usize) -> Self {
342        self.list_files_cache_limit = limit;
343        self
344    }
345
346    /// Sets the TTL (time-to-live) for entries in the list files cache.
347    ///
348    /// Default: None (infinite).
349    pub fn with_list_files_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
350        self.list_files_cache_ttl = ttl;
351        self
352    }
353
354    /// Sets the cache for file-embedded metadata.
355    ///
356    /// Default is a [`DefaultFilesMetadataCache`].
357    pub fn with_file_metadata_cache(
358        mut self,
359        cache: Option<Arc<dyn FileMetadataCache>>,
360    ) -> Self {
361        self.file_metadata_cache = cache;
362        self
363    }
364
365    /// Sets the limit of the file-embedded metadata cache, in bytes.
366    pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
367        self.metadata_cache_limit = limit;
368        self
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use crate::cache::DefaultListFilesCache;
376
377    /// Test to verify that TTL is preserved when not explicitly set in config.
378    /// This fixes issue #19396 where TTL was being unset from DefaultListFilesCache
379    /// when CacheManagerConfig::list_files_cache_ttl was not set explicitly.
380    #[test]
381    fn test_ttl_preserved_when_not_set_in_config() {
382        use std::time::Duration;
383
384        // Create a cache with TTL = 1 second
385        let list_file_cache =
386            DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
387
388        // Verify the cache has TTL set initially
389        assert_eq!(
390            list_file_cache.cache_ttl(),
391            Some(Duration::from_secs(1)),
392            "Cache should have TTL = 1 second initially"
393        );
394
395        // Put cache in config WITHOUT setting list_files_cache_ttl
396        let config = CacheManagerConfig::default()
397            .with_list_files_cache(Some(Arc::new(list_file_cache)));
398
399        // Create CacheManager from config
400        let cache_manager = CacheManager::try_new(&config).unwrap();
401
402        // Verify TTL is preserved (not unset)
403        let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
404
405        assert!(
406            cache_ttl.is_some(),
407            "TTL should be preserved when not set in config. Expected Some(Duration::from_secs(1)), got {cache_ttl:?}"
408        );
409
410        // Verify it's the correct TTL value
411        assert_eq!(
412            cache_ttl,
413            Some(Duration::from_secs(1)),
414            "TTL should be exactly 1 second"
415        );
416    }
417
418    /// Test to verify that TTL can still be overridden when explicitly set in config.
419    #[test]
420    fn test_ttl_overridden_when_set_in_config() {
421        use std::time::Duration;
422
423        // Create a cache with TTL = 1 second
424        let list_file_cache =
425            DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
426
427        // Put cache in config WITH a different TTL set
428        let config = CacheManagerConfig::default()
429            .with_list_files_cache(Some(Arc::new(list_file_cache)))
430            .with_list_files_cache_ttl(Some(Duration::from_secs(60)));
431
432        // Create CacheManager from config
433        let cache_manager = CacheManager::try_new(&config).unwrap();
434
435        // Verify TTL is overridden to the config value
436        let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
437
438        assert_eq!(
439            cache_ttl,
440            Some(Duration::from_secs(60)),
441            "TTL should be overridden to 60 seconds when set in config"
442        );
443    }
444}