datafusion_execution/cache/cache_manager.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::cache::cache_unit::DefaultFilesMetadataCache;
19use crate::cache::CacheAccessor;
20use datafusion_common::{Result, Statistics};
21use object_store::path::Path;
22use object_store::ObjectMeta;
23use std::any::Any;
24use std::collections::HashMap;
25use std::fmt::{Debug, Formatter};
26use std::sync::Arc;
27
28/// A cache for [`Statistics`].
29///
30/// If enabled via [`CacheManagerConfig::with_files_statistics_cache`] this
31/// cache avoids inferring the same file statistics repeatedly during the
32/// session lifetime.
33///
34/// See [`crate::runtime_env::RuntimeEnv`] for more details
35pub type FileStatisticsCache =
36 Arc<dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>>;
37
38/// Cache for storing the [`ObjectMeta`]s that result from listing a path
39///
40/// Listing a path means doing an object store "list" operation or `ls`
41/// command on the local filesystem. This operation can be expensive,
42/// especially when done over remote object stores.
43///
44/// See [`crate::runtime_env::RuntimeEnv`] for more details
45pub type ListFilesCache =
46 Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
47
48/// Generic file-embedded metadata used with [`FileMetadataCache`].
49///
50/// For example, Parquet footers and page metadata can be represented
51/// using this trait.
52///
53/// See [`crate::runtime_env::RuntimeEnv`] for more details
54pub trait FileMetadata: Any + Send + Sync {
55 /// Returns the file metadata as [`Any`] so that it can be downcast to a specific
56 /// implementation.
57 fn as_any(&self) -> &dyn Any;
58
59 /// Returns the size of the metadata in bytes.
60 fn memory_size(&self) -> usize;
61
62 /// Returns extra information about this entry (used by [`FileMetadataCache::list_entries`]).
63 fn extra_info(&self) -> HashMap<String, String>;
64}
65
66/// Cache for file-embedded metadata.
67///
68/// This cache stores per-file metadata in the form of [`FileMetadata`],
69///
70/// For example, the built in [`ListingTable`] uses this cache to avoid parsing
71/// Parquet footers multiple times for the same file.
72///
73/// DataFusion provides a default implementation, [`DefaultFilesMetadataCache`],
74/// and users can also provide their own implementations to implement custom
75/// caching strategies.
76///
77/// See [`crate::runtime_env::RuntimeEnv`] for more details.
78///
79/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
80pub trait FileMetadataCache:
81 CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
82{
83 /// Returns the cache's memory limit in bytes.
84 fn cache_limit(&self) -> usize;
85
86 /// Updates the cache with a new memory limit in bytes.
87 fn update_cache_limit(&self, limit: usize);
88
89 /// Retrieves the information about the entries currently cached.
90 fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry>;
91}
92
93#[derive(Debug, Clone, PartialEq, Eq)]
94/// Represents information about a cached metadata entry.
95/// This is used to expose the metadata cache contents to outside modules.
96pub struct FileMetadataCacheEntry {
97 pub object_meta: ObjectMeta,
98 /// Size of the cached metadata, in bytes.
99 pub size_bytes: usize,
100 /// Number of times this entry was retrieved.
101 pub hits: usize,
102 /// Additional object-specific information.
103 pub extra: HashMap<String, String>,
104}
105
106impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
107 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
108 write!(f, "Cache name: {} with length: {}", self.name(), self.len())
109 }
110}
111
112impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta> {
113 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
114 write!(f, "Cache name: {} with length: {}", self.name(), self.len())
115 }
116}
117
118impl Debug for dyn FileMetadataCache {
119 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
120 write!(f, "Cache name: {} with length: {}", self.name(), self.len())
121 }
122}
123
124/// Manages various caches used in DataFusion.
125///
126/// Following DataFusion design principles, DataFusion provides default cache
127/// implementations, while also allowing users to provide their own custom cache
128/// implementations by implementing the relevant traits.
129///
130/// See [`CacheManagerConfig`] for configuration options.
131#[derive(Debug)]
132pub struct CacheManager {
133 file_statistic_cache: Option<FileStatisticsCache>,
134 list_files_cache: Option<ListFilesCache>,
135 file_metadata_cache: Arc<dyn FileMetadataCache>,
136}
137
138impl CacheManager {
139 pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
140 let file_statistic_cache =
141 config.table_files_statistics_cache.as_ref().map(Arc::clone);
142
143 let list_files_cache = config.list_files_cache.as_ref().map(Arc::clone);
144
145 let file_metadata_cache = config
146 .file_metadata_cache
147 .as_ref()
148 .map(Arc::clone)
149 .unwrap_or_else(|| {
150 Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
151 });
152
153 // the cache memory limit might have changed, ensure the limit is updated
154 file_metadata_cache.update_cache_limit(config.metadata_cache_limit);
155
156 Ok(Arc::new(CacheManager {
157 file_statistic_cache,
158 list_files_cache,
159 file_metadata_cache,
160 }))
161 }
162
163 /// Get the cache of listing files statistics.
164 pub fn get_file_statistic_cache(&self) -> Option<FileStatisticsCache> {
165 self.file_statistic_cache.clone()
166 }
167
168 /// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path.
169 pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
170 self.list_files_cache.clone()
171 }
172
173 /// Get the file embedded metadata cache.
174 pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
175 Arc::clone(&self.file_metadata_cache)
176 }
177
178 /// Get the limit of the file embedded metadata cache.
179 pub fn get_metadata_cache_limit(&self) -> usize {
180 self.file_metadata_cache.cache_limit()
181 }
182}
183
184const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
185
186#[derive(Clone)]
187pub struct CacheManagerConfig {
188 /// Enable cache of files statistics when listing files.
189 /// Avoid get same file statistics repeatedly in same datafusion session.
190 /// Default is disable. Fow now only supports Parquet files.
191 pub table_files_statistics_cache: Option<FileStatisticsCache>,
192 /// Enable cache of file metadata when listing files.
193 /// This setting avoids listing file meta of the same path repeatedly
194 /// in same session, which may be expensive in certain situations (e.g. remote object storage).
195 /// Note that if this option is enabled, DataFusion will not see any updates to the underlying
196 /// location.
197 /// Default is disable.
198 pub list_files_cache: Option<ListFilesCache>,
199 /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
200 /// data file (e.g., Parquet footer and page metadata).
201 /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
202 pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
203 /// Limit of the file-embedded metadata cache, in bytes.
204 pub metadata_cache_limit: usize,
205}
206
207impl Default for CacheManagerConfig {
208 fn default() -> Self {
209 Self {
210 table_files_statistics_cache: Default::default(),
211 list_files_cache: Default::default(),
212 file_metadata_cache: Default::default(),
213 metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
214 }
215 }
216}
217
218impl CacheManagerConfig {
219 /// Set the cache for files statistics.
220 ///
221 /// Default is `None` (disabled).
222 pub fn with_files_statistics_cache(
223 mut self,
224 cache: Option<FileStatisticsCache>,
225 ) -> Self {
226 self.table_files_statistics_cache = cache;
227 self
228 }
229
230 /// Set the cache for listing files.
231 ///
232 /// Default is `None` (disabled).
233 pub fn with_list_files_cache(mut self, cache: Option<ListFilesCache>) -> Self {
234 self.list_files_cache = cache;
235 self
236 }
237
238 /// Sets the cache for file-embedded metadata.
239 ///
240 /// Default is a [`DefaultFilesMetadataCache`].
241 pub fn with_file_metadata_cache(
242 mut self,
243 cache: Option<Arc<dyn FileMetadataCache>>,
244 ) -> Self {
245 self.file_metadata_cache = cache;
246 self
247 }
248
249 /// Sets the limit of the file-embedded metadata cache, in bytes.
250 pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
251 self.metadata_cache_limit = limit;
252 self
253 }
254}