datafusion_execution/cache/
cache_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::cache::cache_unit::DefaultFilesMetadataCache;
19use crate::cache::CacheAccessor;
20use datafusion_common::{Result, Statistics};
21use object_store::path::Path;
22use object_store::ObjectMeta;
23use std::any::Any;
24use std::collections::HashMap;
25use std::fmt::{Debug, Formatter};
26use std::sync::Arc;
27
28/// A cache for [`Statistics`].
29///
30/// If enabled via [`CacheManagerConfig::with_files_statistics_cache`] this
31/// cache avoids inferring the same file statistics repeatedly during the
32/// session lifetime.
33///
34/// See [`crate::runtime_env::RuntimeEnv`] for more details
35pub type FileStatisticsCache =
36    Arc<dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>>;
37
38/// Cache for storing the [`ObjectMeta`]s that result from listing a path
39///
40/// Listing a path means doing an object store "list" operation or `ls`
41/// command on the local filesystem. This operation can be expensive,
42/// especially when done over remote object stores.
43///
44/// See [`crate::runtime_env::RuntimeEnv`] for more details
45pub type ListFilesCache =
46    Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
47
48/// Generic file-embedded metadata used with [`FileMetadataCache`].
49///
50/// For example, Parquet footers and page metadata can be represented
51/// using this trait.
52///
53/// See [`crate::runtime_env::RuntimeEnv`] for more details
54pub trait FileMetadata: Any + Send + Sync {
55    /// Returns the file metadata as [`Any`] so that it can be downcast to a specific
56    /// implementation.
57    fn as_any(&self) -> &dyn Any;
58
59    /// Returns the size of the metadata in bytes.
60    fn memory_size(&self) -> usize;
61
62    /// Returns extra information about this entry (used by [`FileMetadataCache::list_entries`]).
63    fn extra_info(&self) -> HashMap<String, String>;
64}
65
66/// Cache for file-embedded metadata.
67///
68/// This cache stores per-file metadata in the form of [`FileMetadata`],
69///
70/// For example, the built in [`ListingTable`] uses this cache to avoid parsing
71/// Parquet footers multiple times for the same file.
72///
73/// DataFusion provides a default implementation, [`DefaultFilesMetadataCache`],
74/// and users can also provide their own implementations to implement custom
75/// caching strategies.
76///
77/// See [`crate::runtime_env::RuntimeEnv`] for more details.
78///
79/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
80pub trait FileMetadataCache:
81    CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
82{
83    /// Returns the cache's memory limit in bytes.
84    fn cache_limit(&self) -> usize;
85
86    /// Updates the cache with a new memory limit in bytes.
87    fn update_cache_limit(&self, limit: usize);
88
89    /// Retrieves the information about the entries currently cached.
90    fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry>;
91}
92
93#[derive(Debug, Clone, PartialEq, Eq)]
94/// Represents information about a cached metadata entry.
95/// This is used to expose the metadata cache contents to outside modules.
96pub struct FileMetadataCacheEntry {
97    pub object_meta: ObjectMeta,
98    /// Size of the cached metadata, in bytes.
99    pub size_bytes: usize,
100    /// Number of times this entry was retrieved.
101    pub hits: usize,
102    /// Additional object-specific information.
103    pub extra: HashMap<String, String>,
104}
105
106impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
107    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
108        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
109    }
110}
111
112impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta> {
113    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
114        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
115    }
116}
117
118impl Debug for dyn FileMetadataCache {
119    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
120        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
121    }
122}
123
124/// Manages various caches used in DataFusion.
125///
126/// Following DataFusion design principles, DataFusion provides default cache
127/// implementations, while also allowing users to provide their own custom cache
128/// implementations by implementing the relevant traits.
129///
130/// See [`CacheManagerConfig`] for configuration options.
131#[derive(Debug)]
132pub struct CacheManager {
133    file_statistic_cache: Option<FileStatisticsCache>,
134    list_files_cache: Option<ListFilesCache>,
135    file_metadata_cache: Arc<dyn FileMetadataCache>,
136}
137
138impl CacheManager {
139    pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
140        let file_statistic_cache =
141            config.table_files_statistics_cache.as_ref().map(Arc::clone);
142
143        let list_files_cache = config.list_files_cache.as_ref().map(Arc::clone);
144
145        let file_metadata_cache = config
146            .file_metadata_cache
147            .as_ref()
148            .map(Arc::clone)
149            .unwrap_or_else(|| {
150                Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
151            });
152
153        // the cache memory limit might have changed, ensure the limit is updated
154        file_metadata_cache.update_cache_limit(config.metadata_cache_limit);
155
156        Ok(Arc::new(CacheManager {
157            file_statistic_cache,
158            list_files_cache,
159            file_metadata_cache,
160        }))
161    }
162
163    /// Get the cache of listing files statistics.
164    pub fn get_file_statistic_cache(&self) -> Option<FileStatisticsCache> {
165        self.file_statistic_cache.clone()
166    }
167
168    /// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path.
169    pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
170        self.list_files_cache.clone()
171    }
172
173    /// Get the file embedded metadata cache.
174    pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
175        Arc::clone(&self.file_metadata_cache)
176    }
177
178    /// Get the limit of the file embedded metadata cache.
179    pub fn get_metadata_cache_limit(&self) -> usize {
180        self.file_metadata_cache.cache_limit()
181    }
182}
183
184const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
185
186#[derive(Clone)]
187pub struct CacheManagerConfig {
188    /// Enable cache of files statistics when listing files.
189    /// Avoid get same file statistics repeatedly in same datafusion session.
190    /// Default is disable. Fow now only supports Parquet files.
191    pub table_files_statistics_cache: Option<FileStatisticsCache>,
192    /// Enable cache of file metadata when listing files.
193    /// This setting avoids listing file meta of the same path repeatedly
194    /// in same session, which may be expensive in certain situations (e.g. remote object storage).
195    /// Note that if this option is enabled, DataFusion will not see any updates to the underlying
196    /// location.  
197    /// Default is disable.
198    pub list_files_cache: Option<ListFilesCache>,
199    /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
200    /// data file (e.g., Parquet footer and page metadata).
201    /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
202    pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
203    /// Limit of the file-embedded metadata cache, in bytes.
204    pub metadata_cache_limit: usize,
205}
206
207impl Default for CacheManagerConfig {
208    fn default() -> Self {
209        Self {
210            table_files_statistics_cache: Default::default(),
211            list_files_cache: Default::default(),
212            file_metadata_cache: Default::default(),
213            metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
214        }
215    }
216}
217
218impl CacheManagerConfig {
219    /// Set the cache for files statistics.
220    ///
221    /// Default is `None` (disabled).
222    pub fn with_files_statistics_cache(
223        mut self,
224        cache: Option<FileStatisticsCache>,
225    ) -> Self {
226        self.table_files_statistics_cache = cache;
227        self
228    }
229
230    /// Set the cache for listing files.
231    ///     
232    /// Default is `None` (disabled).
233    pub fn with_list_files_cache(mut self, cache: Option<ListFilesCache>) -> Self {
234        self.list_files_cache = cache;
235        self
236    }
237
238    /// Sets the cache for file-embedded metadata.
239    ///
240    /// Default is a [`DefaultFilesMetadataCache`].
241    pub fn with_file_metadata_cache(
242        mut self,
243        cache: Option<Arc<dyn FileMetadataCache>>,
244    ) -> Self {
245        self.file_metadata_cache = cache;
246        self
247    }
248
249    /// Sets the limit of the file-embedded metadata cache, in bytes.
250    pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
251        self.metadata_cache_limit = limit;
252        self
253    }
254}