datafusion_execution/cache/cache_manager.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::cache::cache_unit::DefaultFilesMetadataCache;
19use crate::cache::list_files_cache::ListFilesEntry;
20use crate::cache::list_files_cache::TableScopedPath;
21use crate::cache::{CacheAccessor, DefaultListFilesCache};
22use datafusion_common::TableReference;
23use datafusion_common::stats::Precision;
24use datafusion_common::{Result, Statistics};
25use object_store::ObjectMeta;
26use object_store::path::Path;
27use std::any::Any;
28use std::collections::HashMap;
29use std::fmt::{Debug, Formatter};
30use std::sync::Arc;
31use std::time::Duration;
32
33pub use super::list_files_cache::{
34 DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
35};
36
37/// A cache for [`Statistics`].
38///
39/// If enabled via [`CacheManagerConfig::with_files_statistics_cache`] this
40/// cache avoids inferring the same file statistics repeatedly during the
41/// session lifetime.
42///
43/// See [`crate::runtime_env::RuntimeEnv`] for more details
44pub trait FileStatisticsCache:
45 CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>
46{
47 /// Retrieves the information about the entries currently cached.
48 fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry>;
49}
50
51/// Represents information about a cached statistics entry.
52/// This is used to expose the statistics cache contents to outside modules.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct FileStatisticsCacheEntry {
55 pub object_meta: ObjectMeta,
56 /// Number of table rows.
57 pub num_rows: Precision<usize>,
58 /// Number of table columns.
59 pub num_columns: usize,
60 /// Total table size, in bytes.
61 pub table_size_bytes: Precision<usize>,
62 /// Size of the statistics entry, in bytes.
63 pub statistics_size_bytes: usize,
64}
65
66/// Cache for storing the [`ObjectMeta`]s that result from listing a path
67///
68/// Listing a path means doing an object store "list" operation or `ls`
69/// command on the local filesystem. This operation can be expensive,
70/// especially when done over remote object stores.
71///
72/// The cache key is always the table's base path, ensuring a stable cache key.
73/// The `Extra` type is `Option<Path>`, representing an optional prefix filter
74/// (relative to the table base path) for partition-aware lookups.
75///
76/// When `get_with_extra(key, Some(prefix))` is called:
77/// - The cache entry for `key` (table base path) is fetched
78/// - Results are filtered to only include files matching `key/prefix`
79/// - Filtered results are returned without making a storage call
80///
81/// This enables efficient partition pruning: a single cached listing of the
82/// full table can serve queries for any partition subset.
83///
84/// See [`crate::runtime_env::RuntimeEnv`] for more details.
85pub trait ListFilesCache:
86 CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
87{
88 /// Returns the cache's memory limit in bytes.
89 fn cache_limit(&self) -> usize;
90
91 /// Returns the TTL (time-to-live) for cache entries, if configured.
92 fn cache_ttl(&self) -> Option<Duration>;
93
94 /// Updates the cache with a new memory limit in bytes.
95 fn update_cache_limit(&self, limit: usize);
96
97 /// Updates the cache with a new TTL (time-to-live).
98 fn update_cache_ttl(&self, ttl: Option<Duration>);
99
100 /// Retrieves the information about the entries currently cached.
101 fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry>;
102
103 fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> Result<()>;
104}
105
106/// Generic file-embedded metadata used with [`FileMetadataCache`].
107///
108/// For example, Parquet footers and page metadata can be represented
109/// using this trait.
110///
111/// See [`crate::runtime_env::RuntimeEnv`] for more details
112pub trait FileMetadata: Any + Send + Sync {
113 /// Returns the file metadata as [`Any`] so that it can be downcast to a specific
114 /// implementation.
115 fn as_any(&self) -> &dyn Any;
116
117 /// Returns the size of the metadata in bytes.
118 fn memory_size(&self) -> usize;
119
120 /// Returns extra information about this entry (used by [`FileMetadataCache::list_entries`]).
121 fn extra_info(&self) -> HashMap<String, String>;
122}
123
124/// Cache for file-embedded metadata.
125///
126/// This cache stores per-file metadata in the form of [`FileMetadata`],
127///
128/// For example, the built in [`ListingTable`] uses this cache to avoid parsing
129/// Parquet footers multiple times for the same file.
130///
131/// DataFusion provides a default implementation, [`DefaultFilesMetadataCache`],
132/// and users can also provide their own implementations to implement custom
133/// caching strategies.
134///
135/// See [`crate::runtime_env::RuntimeEnv`] for more details.
136///
137/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
138pub trait FileMetadataCache:
139 CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
140{
141 /// Returns the cache's memory limit in bytes.
142 fn cache_limit(&self) -> usize;
143
144 /// Updates the cache with a new memory limit in bytes.
145 fn update_cache_limit(&self, limit: usize);
146
147 /// Retrieves the information about the entries currently cached.
148 fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry>;
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
152/// Represents information about a cached metadata entry.
153/// This is used to expose the metadata cache contents to outside modules.
154pub struct FileMetadataCacheEntry {
155 pub object_meta: ObjectMeta,
156 /// Size of the cached metadata, in bytes.
157 pub size_bytes: usize,
158 /// Number of times this entry was retrieved.
159 pub hits: usize,
160 /// Additional object-specific information.
161 pub extra: HashMap<String, String>,
162}
163
164impl Debug for dyn FileStatisticsCache {
165 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
166 write!(f, "Cache name: {} with length: {}", self.name(), self.len())
167 }
168}
169
170impl Debug for dyn ListFilesCache {
171 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172 write!(f, "Cache name: {} with length: {}", self.name(), self.len())
173 }
174}
175
176impl Debug for dyn FileMetadataCache {
177 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178 write!(f, "Cache name: {} with length: {}", self.name(), self.len())
179 }
180}
181
182/// Manages various caches used in DataFusion.
183///
184/// Following DataFusion design principles, DataFusion provides default cache
185/// implementations, while also allowing users to provide their own custom cache
186/// implementations by implementing the relevant traits.
187///
188/// See [`CacheManagerConfig`] for configuration options.
189#[derive(Debug)]
190pub struct CacheManager {
191 file_statistic_cache: Option<Arc<dyn FileStatisticsCache>>,
192 list_files_cache: Option<Arc<dyn ListFilesCache>>,
193 file_metadata_cache: Arc<dyn FileMetadataCache>,
194}
195
196impl CacheManager {
197 pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
198 let file_statistic_cache =
199 config.table_files_statistics_cache.as_ref().map(Arc::clone);
200
201 let list_files_cache = match &config.list_files_cache {
202 Some(lfc) if config.list_files_cache_limit > 0 => {
203 // the cache memory limit or ttl might have changed, ensure they are updated
204 lfc.update_cache_limit(config.list_files_cache_limit);
205 // Only update TTL if explicitly set in config, otherwise preserve the cache's existing TTL
206 if let Some(ttl) = config.list_files_cache_ttl {
207 lfc.update_cache_ttl(Some(ttl));
208 }
209 Some(Arc::clone(lfc))
210 }
211 None if config.list_files_cache_limit > 0 => {
212 let lfc: Arc<dyn ListFilesCache> = Arc::new(DefaultListFilesCache::new(
213 config.list_files_cache_limit,
214 config.list_files_cache_ttl,
215 ));
216 Some(lfc)
217 }
218 _ => None,
219 };
220
221 let file_metadata_cache = config
222 .file_metadata_cache
223 .as_ref()
224 .map(Arc::clone)
225 .unwrap_or_else(|| {
226 Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
227 });
228
229 // the cache memory limit might have changed, ensure the limit is updated
230 file_metadata_cache.update_cache_limit(config.metadata_cache_limit);
231
232 Ok(Arc::new(CacheManager {
233 file_statistic_cache,
234 list_files_cache,
235 file_metadata_cache,
236 }))
237 }
238
239 /// Get the cache of listing files statistics.
240 pub fn get_file_statistic_cache(&self) -> Option<Arc<dyn FileStatisticsCache>> {
241 self.file_statistic_cache.clone()
242 }
243
244 /// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path.
245 pub fn get_list_files_cache(&self) -> Option<Arc<dyn ListFilesCache>> {
246 self.list_files_cache.clone()
247 }
248
249 /// Get the memory limit of the list files cache.
250 pub fn get_list_files_cache_limit(&self) -> usize {
251 self.list_files_cache
252 .as_ref()
253 .map_or(0, |c| c.cache_limit())
254 }
255
256 /// Get the TTL (time-to-live) of the list files cache.
257 pub fn get_list_files_cache_ttl(&self) -> Option<Duration> {
258 self.list_files_cache.as_ref().and_then(|c| c.cache_ttl())
259 }
260
261 /// Get the file embedded metadata cache.
262 pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
263 Arc::clone(&self.file_metadata_cache)
264 }
265
266 /// Get the limit of the file embedded metadata cache.
267 pub fn get_metadata_cache_limit(&self) -> usize {
268 self.file_metadata_cache.cache_limit()
269 }
270}
271
272pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
273
274#[derive(Clone)]
275pub struct CacheManagerConfig {
276 /// Enable caching of file statistics when listing files.
277 /// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session.
278 /// Default is disabled. Currently only Parquet files are supported.
279 pub table_files_statistics_cache: Option<Arc<dyn FileStatisticsCache>>,
280 /// Enable caching of file metadata when listing files.
281 /// Enabling the cache avoids repeat list and object metadata fetch operations, which may be
282 /// expensive in certain situations (e.g. remote object storage), for objects under paths that
283 /// are cached.
284 /// Note that if this option is enabled, DataFusion will not see any updates to the underlying
285 /// storage for at least `list_files_cache_ttl` duration.
286 /// Default is disabled.
287 pub list_files_cache: Option<Arc<dyn ListFilesCache>>,
288 /// Limit of the `list_files_cache`, in bytes. Default: 1MiB.
289 pub list_files_cache_limit: usize,
290 /// The duration the list files cache will consider an entry valid after insertion. Note that
291 /// changes to the underlying storage system, such as adding or removing data, will not be
292 /// visible until an entry expires. Default: None (infinite).
293 pub list_files_cache_ttl: Option<Duration>,
294 /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
295 /// data file (e.g., Parquet footer and page metadata).
296 /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
297 pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
298 /// Limit of the file-embedded metadata cache, in bytes.
299 pub metadata_cache_limit: usize,
300}
301
302impl Default for CacheManagerConfig {
303 fn default() -> Self {
304 Self {
305 table_files_statistics_cache: Default::default(),
306 list_files_cache: Default::default(),
307 list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
308 list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL,
309 file_metadata_cache: Default::default(),
310 metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
311 }
312 }
313}
314
315impl CacheManagerConfig {
316 /// Set the cache for files statistics.
317 ///
318 /// Default is `None` (disabled).
319 pub fn with_files_statistics_cache(
320 mut self,
321 cache: Option<Arc<dyn FileStatisticsCache>>,
322 ) -> Self {
323 self.table_files_statistics_cache = cache;
324 self
325 }
326
327 /// Set the cache for listing files.
328 ///
329 /// Default is `None` (disabled).
330 pub fn with_list_files_cache(
331 mut self,
332 cache: Option<Arc<dyn ListFilesCache>>,
333 ) -> Self {
334 self.list_files_cache = cache;
335 self
336 }
337
338 /// Sets the limit of the list files cache, in bytes.
339 ///
340 /// Default: 1MiB (1,048,576 bytes).
341 pub fn with_list_files_cache_limit(mut self, limit: usize) -> Self {
342 self.list_files_cache_limit = limit;
343 self
344 }
345
346 /// Sets the TTL (time-to-live) for entries in the list files cache.
347 ///
348 /// Default: None (infinite).
349 pub fn with_list_files_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
350 self.list_files_cache_ttl = ttl;
351 self
352 }
353
354 /// Sets the cache for file-embedded metadata.
355 ///
356 /// Default is a [`DefaultFilesMetadataCache`].
357 pub fn with_file_metadata_cache(
358 mut self,
359 cache: Option<Arc<dyn FileMetadataCache>>,
360 ) -> Self {
361 self.file_metadata_cache = cache;
362 self
363 }
364
365 /// Sets the limit of the file-embedded metadata cache, in bytes.
366 pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
367 self.metadata_cache_limit = limit;
368 self
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375 use crate::cache::DefaultListFilesCache;
376
377 /// Test to verify that TTL is preserved when not explicitly set in config.
378 /// This fixes issue #19396 where TTL was being unset from DefaultListFilesCache
379 /// when CacheManagerConfig::list_files_cache_ttl was not set explicitly.
380 #[test]
381 fn test_ttl_preserved_when_not_set_in_config() {
382 use std::time::Duration;
383
384 // Create a cache with TTL = 1 second
385 let list_file_cache =
386 DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
387
388 // Verify the cache has TTL set initially
389 assert_eq!(
390 list_file_cache.cache_ttl(),
391 Some(Duration::from_secs(1)),
392 "Cache should have TTL = 1 second initially"
393 );
394
395 // Put cache in config WITHOUT setting list_files_cache_ttl
396 let config = CacheManagerConfig::default()
397 .with_list_files_cache(Some(Arc::new(list_file_cache)));
398
399 // Create CacheManager from config
400 let cache_manager = CacheManager::try_new(&config).unwrap();
401
402 // Verify TTL is preserved (not unset)
403 let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
404
405 assert!(
406 cache_ttl.is_some(),
407 "TTL should be preserved when not set in config. Expected Some(Duration::from_secs(1)), got {cache_ttl:?}"
408 );
409
410 // Verify it's the correct TTL value
411 assert_eq!(
412 cache_ttl,
413 Some(Duration::from_secs(1)),
414 "TTL should be exactly 1 second"
415 );
416 }
417
418 /// Test to verify that TTL can still be overridden when explicitly set in config.
419 #[test]
420 fn test_ttl_overridden_when_set_in_config() {
421 use std::time::Duration;
422
423 // Create a cache with TTL = 1 second
424 let list_file_cache =
425 DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
426
427 // Put cache in config WITH a different TTL set
428 let config = CacheManagerConfig::default()
429 .with_list_files_cache(Some(Arc::new(list_file_cache)))
430 .with_list_files_cache_ttl(Some(Duration::from_secs(60)));
431
432 // Create CacheManager from config
433 let cache_manager = CacheManager::try_new(&config).unwrap();
434
435 // Verify TTL is overridden to the config value
436 let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
437
438 assert_eq!(
439 cache_ttl,
440 Some(Duration::from_secs(60)),
441 "TTL should be overridden to 60 seconds when set in config"
442 );
443 }
444}