datafusion_execution/cache/
cache_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::cache::CacheAccessor;
19use datafusion_common::{Result, Statistics};
20use object_store::path::Path;
21use object_store::ObjectMeta;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25/// The cache of listing files statistics.
26/// if set [`CacheManagerConfig::with_files_statistics_cache`]
27/// Will avoid infer same file statistics repeatedly during the session lifetime,
28/// this cache will store in [`crate::runtime_env::RuntimeEnv`].
29pub type FileStatisticsCache =
30    Arc<dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>>;
31
32pub type ListFilesCache =
33    Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
34
35impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
36    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
37        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
38    }
39}
40
41impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta> {
42    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
43        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
44    }
45}
46
47#[derive(Default, Debug)]
48pub struct CacheManager {
49    file_statistic_cache: Option<FileStatisticsCache>,
50    list_files_cache: Option<ListFilesCache>,
51}
52
53impl CacheManager {
54    pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
55        let mut manager = CacheManager::default();
56        if let Some(cc) = &config.table_files_statistics_cache {
57            manager.file_statistic_cache = Some(Arc::clone(cc))
58        }
59        if let Some(lc) = &config.list_files_cache {
60            manager.list_files_cache = Some(Arc::clone(lc))
61        }
62        Ok(Arc::new(manager))
63    }
64
65    /// Get the cache of listing files statistics.
66    pub fn get_file_statistic_cache(&self) -> Option<FileStatisticsCache> {
67        self.file_statistic_cache.clone()
68    }
69
70    /// Get the cache of objectMeta under same path.
71    pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
72        self.list_files_cache.clone()
73    }
74}
75
76#[derive(Clone, Default)]
77pub struct CacheManagerConfig {
78    /// Enable cache of files statistics when listing files.
79    /// Avoid get same file statistics repeatedly in same datafusion session.
80    /// Default is disable. Fow now only supports Parquet files.
81    pub table_files_statistics_cache: Option<FileStatisticsCache>,
82    /// Enable cache of file metadata when listing files.
83    /// This setting avoids listing file meta of the same path repeatedly
84    /// in same session, which may be expensive in certain situations (e.g. remote object storage).
85    /// Note that if this option is enabled, DataFusion will not see any updates to the underlying
86    /// location.  
87    /// Default is disable.
88    pub list_files_cache: Option<ListFilesCache>,
89}
90
91impl CacheManagerConfig {
92    pub fn with_files_statistics_cache(
93        mut self,
94        cache: Option<FileStatisticsCache>,
95    ) -> Self {
96        self.table_files_statistics_cache = cache;
97        self
98    }
99
100    pub fn with_list_files_cache(mut self, cache: Option<ListFilesCache>) -> Self {
101        self.list_files_cache = cache;
102        self
103    }
104}