datafusion_execution/cache/cache_manager.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::cache::CacheAccessor;
19use crate::cache::DefaultListFilesCache;
20use crate::cache::cache_unit::DefaultFilesMetadataCache;
21use crate::cache::list_files_cache::ListFilesEntry;
22use crate::cache::list_files_cache::TableScopedPath;
23use datafusion_common::TableReference;
24use datafusion_common::stats::Precision;
25use datafusion_common::{Result, Statistics};
26use datafusion_physical_expr_common::sort_expr::LexOrdering;
27use object_store::ObjectMeta;
28use object_store::path::Path;
29use std::any::Any;
30use std::collections::HashMap;
31use std::fmt::{Debug, Formatter};
32use std::ops::Deref;
33use std::sync::Arc;
34use std::time::Duration;
35
36pub use super::list_files_cache::{
37 DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
38};
39
40/// Cached metadata for a file, including statistics and ordering.
41///
42/// This struct embeds the [`ObjectMeta`] used for cache validation,
43/// along with the cached statistics and ordering information.
44#[derive(Debug, Clone)]
45pub struct CachedFileMetadata {
46 /// File metadata used for cache validation (size, last_modified).
47 pub meta: ObjectMeta,
48 /// Cached statistics for the file, if available.
49 pub statistics: Arc<Statistics>,
50 /// Cached ordering for the file.
51 pub ordering: Option<LexOrdering>,
52}
53
54impl CachedFileMetadata {
55 /// Create a new cached file metadata entry.
56 pub fn new(
57 meta: ObjectMeta,
58 statistics: Arc<Statistics>,
59 ordering: Option<LexOrdering>,
60 ) -> Self {
61 Self {
62 meta,
63 statistics,
64 ordering,
65 }
66 }
67
68 /// Check if this cached entry is still valid for the given metadata.
69 ///
70 /// Returns true if the file size and last modified time match.
71 pub fn is_valid_for(&self, current_meta: &ObjectMeta) -> bool {
72 self.meta.size == current_meta.size
73 && self.meta.last_modified == current_meta.last_modified
74 }
75}
76
77/// A cache for file statistics and orderings.
78///
79/// This cache stores [`CachedFileMetadata`] which includes:
80/// - File metadata for validation (size, last_modified)
81/// - Statistics for the file
82/// - Ordering information for the file
83///
84/// If enabled via [`CacheManagerConfig::with_files_statistics_cache`] this
85/// cache avoids inferring the same file statistics repeatedly during the
86/// session lifetime.
87///
88/// The typical usage pattern is:
89/// 1. Call `get(path)` to check for cached value
90/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)`
91/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
92///
93/// See [`crate::runtime_env::RuntimeEnv`] for more details
94pub trait FileStatisticsCache: CacheAccessor<Path, CachedFileMetadata> {
95 /// Retrieves the information about the entries currently cached.
96 fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry>;
97}
98
99/// Represents information about a cached statistics entry.
100/// This is used to expose the statistics cache contents to outside modules.
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct FileStatisticsCacheEntry {
103 pub object_meta: ObjectMeta,
104 /// Number of table rows.
105 pub num_rows: Precision<usize>,
106 /// Number of table columns.
107 pub num_columns: usize,
108 /// Total table size, in bytes.
109 pub table_size_bytes: Precision<usize>,
110 /// Size of the statistics entry, in bytes.
111 pub statistics_size_bytes: usize,
112 /// Whether ordering information is cached for this file.
113 pub has_ordering: bool,
114}
115
116/// Cached file listing.
117///
118/// TTL expiration is handled internally by the cache implementation.
119#[derive(Debug, Clone, PartialEq)]
120pub struct CachedFileList {
121 /// The cached file list.
122 pub files: Arc<Vec<ObjectMeta>>,
123}
124
125impl CachedFileList {
126 /// Create a new cached file list.
127 pub fn new(files: Vec<ObjectMeta>) -> Self {
128 Self {
129 files: Arc::new(files),
130 }
131 }
132
133 /// Filter the files by prefix.
134 fn filter_by_prefix(&self, prefix: &Option<Path>) -> Vec<ObjectMeta> {
135 match prefix {
136 Some(prefix) => self
137 .files
138 .iter()
139 .filter(|meta| meta.location.as_ref().starts_with(prefix.as_ref()))
140 .cloned()
141 .collect(),
142 None => self.files.as_ref().clone(),
143 }
144 }
145
146 /// Returns files matching the given prefix.
147 ///
148 /// When prefix is `None`, returns a clone of the `Arc` (no data copy).
149 /// When filtering is needed, returns a new `Arc` with filtered results (clones each matching [`ObjectMeta`]).
150 pub fn files_matching_prefix(&self, prefix: &Option<Path>) -> Arc<Vec<ObjectMeta>> {
151 match prefix {
152 None => Arc::clone(&self.files),
153 Some(p) => Arc::new(self.filter_by_prefix(&Some(p.clone()))),
154 }
155 }
156}
157
158impl Deref for CachedFileList {
159 type Target = Arc<Vec<ObjectMeta>>;
160 fn deref(&self) -> &Self::Target {
161 &self.files
162 }
163}
164
165impl From<Vec<ObjectMeta>> for CachedFileList {
166 fn from(files: Vec<ObjectMeta>) -> Self {
167 Self::new(files)
168 }
169}
170
171/// Cache for storing the [`ObjectMeta`]s that result from listing a path
172///
173/// Listing a path means doing an object store "list" operation or `ls`
174/// command on the local filesystem. This operation can be expensive,
175/// especially when done over remote object stores.
176///
177/// The cache key is always the table's base path, ensuring a stable cache key.
178/// The cached value is a [`CachedFileList`] containing the files and a timestamp.
179///
180/// Partition filtering is done after retrieval using [`CachedFileList::files_matching_prefix`].
181///
182/// See [`crate::runtime_env::RuntimeEnv`] for more details.
183pub trait ListFilesCache: CacheAccessor<TableScopedPath, CachedFileList> {
184 /// Returns the cache's memory limit in bytes.
185 fn cache_limit(&self) -> usize;
186
187 /// Returns the TTL (time-to-live) for cache entries, if configured.
188 fn cache_ttl(&self) -> Option<Duration>;
189
190 /// Updates the cache with a new memory limit in bytes.
191 fn update_cache_limit(&self, limit: usize);
192
193 /// Updates the cache with a new TTL (time-to-live).
194 fn update_cache_ttl(&self, ttl: Option<Duration>);
195
196 /// Retrieves the information about the entries currently cached.
197 fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry>;
198
199 /// Drop all entries for the given table reference.
200 fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> Result<()>;
201}
202
203/// Generic file-embedded metadata used with [`FileMetadataCache`].
204///
205/// For example, Parquet footers and page metadata can be represented
206/// using this trait.
207///
208/// See [`crate::runtime_env::RuntimeEnv`] for more details
209pub trait FileMetadata: Any + Send + Sync {
210 /// Returns the file metadata as [`Any`] so that it can be downcast to a specific
211 /// implementation.
212 fn as_any(&self) -> &dyn Any;
213
214 /// Returns the size of the metadata in bytes.
215 fn memory_size(&self) -> usize;
216
217 /// Returns extra information about this entry (used by [`FileMetadataCache::list_entries`]).
218 fn extra_info(&self) -> HashMap<String, String>;
219}
220
221/// Cached file metadata entry with validation information.
222#[derive(Clone)]
223pub struct CachedFileMetadataEntry {
224 /// File metadata used for cache validation (size, last_modified).
225 pub meta: ObjectMeta,
226 /// The cached file metadata.
227 pub file_metadata: Arc<dyn FileMetadata>,
228}
229
230impl CachedFileMetadataEntry {
231 /// Create a new cached file metadata entry.
232 pub fn new(meta: ObjectMeta, file_metadata: Arc<dyn FileMetadata>) -> Self {
233 Self {
234 meta,
235 file_metadata,
236 }
237 }
238
239 /// Check if this cached entry is still valid for the given metadata.
240 pub fn is_valid_for(&self, current_meta: &ObjectMeta) -> bool {
241 self.meta.size == current_meta.size
242 && self.meta.last_modified == current_meta.last_modified
243 }
244}
245
246impl Debug for CachedFileMetadataEntry {
247 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
248 f.debug_struct("CachedFileMetadataEntry")
249 .field("meta", &self.meta)
250 .field("memory_size", &self.file_metadata.memory_size())
251 .finish()
252 }
253}
254
255/// Cache for file-embedded metadata.
256///
257/// This cache stores per-file metadata in the form of [`CachedFileMetadataEntry`],
258/// which includes the [`ObjectMeta`] for validation.
259///
260/// For example, the built in [`ListingTable`] uses this cache to avoid parsing
261/// Parquet footers multiple times for the same file.
262///
263/// DataFusion provides a default implementation, [`DefaultFilesMetadataCache`],
264/// and users can also provide their own implementations to implement custom
265/// caching strategies.
266///
267/// The typical usage pattern is:
268/// 1. Call `get(path)` to check for cached value
269/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)`
270/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
271///
272/// See [`crate::runtime_env::RuntimeEnv`] for more details.
273///
274/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
275pub trait FileMetadataCache: CacheAccessor<Path, CachedFileMetadataEntry> {
276 /// Returns the cache's memory limit in bytes.
277 fn cache_limit(&self) -> usize;
278
279 /// Updates the cache with a new memory limit in bytes.
280 fn update_cache_limit(&self, limit: usize);
281
282 /// Retrieves the information about the entries currently cached.
283 fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry>;
284}
285
286#[derive(Debug, Clone, PartialEq, Eq)]
287/// Represents information about a cached metadata entry.
288/// This is used to expose the metadata cache contents to outside modules.
289pub struct FileMetadataCacheEntry {
290 pub object_meta: ObjectMeta,
291 /// Size of the cached metadata, in bytes.
292 pub size_bytes: usize,
293 /// Number of times this entry was retrieved.
294 pub hits: usize,
295 /// Additional object-specific information.
296 pub extra: HashMap<String, String>,
297}
298
299impl Debug for dyn FileStatisticsCache {
300 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
301 write!(f, "Cache name: {} with length: {}", self.name(), self.len())
302 }
303}
304
305impl Debug for dyn ListFilesCache {
306 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
307 write!(f, "Cache name: {} with length: {}", self.name(), self.len())
308 }
309}
310
311impl Debug for dyn FileMetadataCache {
312 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
313 write!(f, "Cache name: {} with length: {}", self.name(), self.len())
314 }
315}
316
317/// Manages various caches used in DataFusion.
318///
319/// Following DataFusion design principles, DataFusion provides default cache
320/// implementations, while also allowing users to provide their own custom cache
321/// implementations by implementing the relevant traits.
322///
323/// See [`CacheManagerConfig`] for configuration options.
324#[derive(Debug)]
325pub struct CacheManager {
326 file_statistic_cache: Option<Arc<dyn FileStatisticsCache>>,
327 list_files_cache: Option<Arc<dyn ListFilesCache>>,
328 file_metadata_cache: Arc<dyn FileMetadataCache>,
329}
330
331impl CacheManager {
332 pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
333 let file_statistic_cache =
334 config.table_files_statistics_cache.as_ref().map(Arc::clone);
335
336 let list_files_cache = match &config.list_files_cache {
337 Some(lfc) if config.list_files_cache_limit > 0 => {
338 // the cache memory limit or ttl might have changed, ensure they are updated
339 lfc.update_cache_limit(config.list_files_cache_limit);
340 // Only update TTL if explicitly set in config, otherwise preserve the cache's existing TTL
341 if let Some(ttl) = config.list_files_cache_ttl {
342 lfc.update_cache_ttl(Some(ttl));
343 }
344 Some(Arc::clone(lfc))
345 }
346 None if config.list_files_cache_limit > 0 => {
347 let lfc: Arc<dyn ListFilesCache> = Arc::new(DefaultListFilesCache::new(
348 config.list_files_cache_limit,
349 config.list_files_cache_ttl,
350 ));
351 Some(lfc)
352 }
353 _ => None,
354 };
355
356 let file_metadata_cache = config
357 .file_metadata_cache
358 .as_ref()
359 .map(Arc::clone)
360 .unwrap_or_else(|| {
361 Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
362 });
363
364 // the cache memory limit might have changed, ensure the limit is updated
365 file_metadata_cache.update_cache_limit(config.metadata_cache_limit);
366
367 Ok(Arc::new(CacheManager {
368 file_statistic_cache,
369 list_files_cache,
370 file_metadata_cache,
371 }))
372 }
373
374 /// Get the cache of listing files statistics.
375 pub fn get_file_statistic_cache(&self) -> Option<Arc<dyn FileStatisticsCache>> {
376 self.file_statistic_cache.clone()
377 }
378
379 /// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path.
380 pub fn get_list_files_cache(&self) -> Option<Arc<dyn ListFilesCache>> {
381 self.list_files_cache.clone()
382 }
383
384 /// Get the memory limit of the list files cache.
385 pub fn get_list_files_cache_limit(&self) -> usize {
386 self.list_files_cache
387 .as_ref()
388 .map_or(0, |c| c.cache_limit())
389 }
390
391 /// Get the TTL (time-to-live) of the list files cache.
392 pub fn get_list_files_cache_ttl(&self) -> Option<Duration> {
393 self.list_files_cache.as_ref().and_then(|c| c.cache_ttl())
394 }
395
396 /// Get the file embedded metadata cache.
397 pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
398 Arc::clone(&self.file_metadata_cache)
399 }
400
401 /// Get the limit of the file embedded metadata cache.
402 pub fn get_metadata_cache_limit(&self) -> usize {
403 self.file_metadata_cache.cache_limit()
404 }
405}
406
407pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
408
409#[derive(Clone)]
410pub struct CacheManagerConfig {
411 /// Enable caching of file statistics when listing files.
412 /// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session.
413 /// Default is disabled. Currently only Parquet files are supported.
414 pub table_files_statistics_cache: Option<Arc<dyn FileStatisticsCache>>,
415 /// Enable caching of file metadata when listing files.
416 /// Enabling the cache avoids repeat list and object metadata fetch operations, which may be
417 /// expensive in certain situations (e.g. remote object storage), for objects under paths that
418 /// are cached.
419 /// Note that if this option is enabled, DataFusion will not see any updates to the underlying
420 /// storage for at least `list_files_cache_ttl` duration.
421 /// Default is disabled.
422 pub list_files_cache: Option<Arc<dyn ListFilesCache>>,
423 /// Limit of the `list_files_cache`, in bytes. Default: 1MiB.
424 pub list_files_cache_limit: usize,
425 /// The duration the list files cache will consider an entry valid after insertion. Note that
426 /// changes to the underlying storage system, such as adding or removing data, will not be
427 /// visible until an entry expires. Default: None (infinite).
428 pub list_files_cache_ttl: Option<Duration>,
429 /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
430 /// data file (e.g., Parquet footer and page metadata).
431 /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
432 pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
433 /// Limit of the file-embedded metadata cache, in bytes.
434 pub metadata_cache_limit: usize,
435}
436
437impl Default for CacheManagerConfig {
438 fn default() -> Self {
439 Self {
440 table_files_statistics_cache: Default::default(),
441 list_files_cache: Default::default(),
442 list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
443 list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL,
444 file_metadata_cache: Default::default(),
445 metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
446 }
447 }
448}
449
450impl CacheManagerConfig {
451 /// Set the cache for files statistics.
452 ///
453 /// Default is `None` (disabled).
454 pub fn with_files_statistics_cache(
455 mut self,
456 cache: Option<Arc<dyn FileStatisticsCache>>,
457 ) -> Self {
458 self.table_files_statistics_cache = cache;
459 self
460 }
461
462 /// Set the cache for listing files.
463 ///
464 /// Default is `None` (disabled).
465 pub fn with_list_files_cache(
466 mut self,
467 cache: Option<Arc<dyn ListFilesCache>>,
468 ) -> Self {
469 self.list_files_cache = cache;
470 self
471 }
472
473 /// Sets the limit of the list files cache, in bytes.
474 ///
475 /// Default: 1MiB (1,048,576 bytes).
476 pub fn with_list_files_cache_limit(mut self, limit: usize) -> Self {
477 self.list_files_cache_limit = limit;
478 self
479 }
480
481 /// Sets the TTL (time-to-live) for entries in the list files cache.
482 ///
483 /// Default: None (infinite).
484 pub fn with_list_files_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
485 self.list_files_cache_ttl = ttl;
486 self
487 }
488
489 /// Sets the cache for file-embedded metadata.
490 ///
491 /// Default is a [`DefaultFilesMetadataCache`].
492 pub fn with_file_metadata_cache(
493 mut self,
494 cache: Option<Arc<dyn FileMetadataCache>>,
495 ) -> Self {
496 self.file_metadata_cache = cache;
497 self
498 }
499
500 /// Sets the limit of the file-embedded metadata cache, in bytes.
501 pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
502 self.metadata_cache_limit = limit;
503 self
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510 use crate::cache::DefaultListFilesCache;
511
512 /// Test to verify that TTL is preserved when not explicitly set in config.
513 /// This fixes issue #19396 where TTL was being unset from DefaultListFilesCache
514 /// when CacheManagerConfig::list_files_cache_ttl was not set explicitly.
515 #[test]
516 fn test_ttl_preserved_when_not_set_in_config() {
517 use std::time::Duration;
518
519 // Create a cache with TTL = 1 second
520 let list_file_cache =
521 DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
522
523 // Verify the cache has TTL set initially
524 assert_eq!(
525 list_file_cache.cache_ttl(),
526 Some(Duration::from_secs(1)),
527 "Cache should have TTL = 1 second initially"
528 );
529
530 // Put cache in config WITHOUT setting list_files_cache_ttl
531 let config = CacheManagerConfig::default()
532 .with_list_files_cache(Some(Arc::new(list_file_cache)));
533
534 // Create CacheManager from config
535 let cache_manager = CacheManager::try_new(&config).unwrap();
536
537 // Verify TTL is preserved (not unset)
538 let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
539
540 assert!(
541 cache_ttl.is_some(),
542 "TTL should be preserved when not set in config. Expected Some(Duration::from_secs(1)), got {cache_ttl:?}"
543 );
544
545 // Verify it's the correct TTL value
546 assert_eq!(
547 cache_ttl,
548 Some(Duration::from_secs(1)),
549 "TTL should be exactly 1 second"
550 );
551 }
552
553 /// Test to verify that TTL can still be overridden when explicitly set in config.
554 #[test]
555 fn test_ttl_overridden_when_set_in_config() {
556 use std::time::Duration;
557
558 // Create a cache with TTL = 1 second
559 let list_file_cache =
560 DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
561
562 // Put cache in config WITH a different TTL set
563 let config = CacheManagerConfig::default()
564 .with_list_files_cache(Some(Arc::new(list_file_cache)))
565 .with_list_files_cache_ttl(Some(Duration::from_secs(60)));
566
567 // Create CacheManager from config
568 let cache_manager = CacheManager::try_new(&config).unwrap();
569
570 // Verify TTL is overridden to the config value
571 let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
572
573 assert_eq!(
574 cache_ttl,
575 Some(Duration::from_secs(60)),
576 "TTL should be overridden to 60 seconds when set in config"
577 );
578 }
579}