datafusion_execution/cache/
cache_unit.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::cache::CacheAccessor;
21
22use datafusion_common::Statistics;
23
24use dashmap::DashMap;
25use object_store::path::Path;
26use object_store::ObjectMeta;
27
28/// Collected statistics for files
29/// Cache is invalided when file size or last modification has changed
30#[derive(Default)]
31pub struct DefaultFileStatisticsCache {
32    statistics: DashMap<Path, (ObjectMeta, Arc<Statistics>)>,
33}
34
35impl CacheAccessor<Path, Arc<Statistics>> for DefaultFileStatisticsCache {
36    type Extra = ObjectMeta;
37
38    /// Get `Statistics` for file location.
39    fn get(&self, k: &Path) -> Option<Arc<Statistics>> {
40        self.statistics
41            .get(k)
42            .map(|s| Some(Arc::clone(&s.value().1)))
43            .unwrap_or(None)
44    }
45
46    /// Get `Statistics` for file location. Returns None if file has changed or not found.
47    fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option<Arc<Statistics>> {
48        self.statistics
49            .get(k)
50            .map(|s| {
51                let (saved_meta, statistics) = s.value();
52                if saved_meta.size != e.size
53                    || saved_meta.last_modified != e.last_modified
54                {
55                    // file has changed
56                    None
57                } else {
58                    Some(Arc::clone(statistics))
59                }
60            })
61            .unwrap_or(None)
62    }
63
64    /// Save collected file statistics
65    fn put(&self, _key: &Path, _value: Arc<Statistics>) -> Option<Arc<Statistics>> {
66        panic!("Put cache in DefaultFileStatisticsCache without Extra not supported.")
67    }
68
69    fn put_with_extra(
70        &self,
71        key: &Path,
72        value: Arc<Statistics>,
73        e: &Self::Extra,
74    ) -> Option<Arc<Statistics>> {
75        self.statistics
76            .insert(key.clone(), (e.clone(), value))
77            .map(|x| x.1)
78    }
79
80    fn remove(&mut self, k: &Path) -> Option<Arc<Statistics>> {
81        self.statistics.remove(k).map(|x| x.1 .1)
82    }
83
84    fn contains_key(&self, k: &Path) -> bool {
85        self.statistics.contains_key(k)
86    }
87
88    fn len(&self) -> usize {
89        self.statistics.len()
90    }
91
92    fn clear(&self) {
93        self.statistics.clear()
94    }
95    fn name(&self) -> String {
96        "DefaultFileStatisticsCache".to_string()
97    }
98}
99
100/// Collected files metadata for listing files.
101/// Cache will not invalided until user call remove or clear.
102#[derive(Default)]
103pub struct DefaultListFilesCache {
104    statistics: DashMap<Path, Arc<Vec<ObjectMeta>>>,
105}
106
107impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
108    type Extra = ObjectMeta;
109
110    fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
111        self.statistics.get(k).map(|x| Arc::clone(x.value()))
112    }
113
114    fn get_with_extra(
115        &self,
116        _k: &Path,
117        _e: &Self::Extra,
118    ) -> Option<Arc<Vec<ObjectMeta>>> {
119        panic!("Not supported DefaultListFilesCache get_with_extra")
120    }
121
122    fn put(
123        &self,
124        key: &Path,
125        value: Arc<Vec<ObjectMeta>>,
126    ) -> Option<Arc<Vec<ObjectMeta>>> {
127        self.statistics.insert(key.clone(), value)
128    }
129
130    fn put_with_extra(
131        &self,
132        _key: &Path,
133        _value: Arc<Vec<ObjectMeta>>,
134        _e: &Self::Extra,
135    ) -> Option<Arc<Vec<ObjectMeta>>> {
136        panic!("Not supported DefaultListFilesCache put_with_extra")
137    }
138
139    fn remove(&mut self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
140        self.statistics.remove(k).map(|x| x.1)
141    }
142
143    fn contains_key(&self, k: &Path) -> bool {
144        self.statistics.contains_key(k)
145    }
146
147    fn len(&self) -> usize {
148        self.statistics.len()
149    }
150
151    fn clear(&self) {
152        self.statistics.clear()
153    }
154
155    fn name(&self) -> String {
156        "DefaultListFilesCache".to_string()
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use crate::cache::cache_unit::{DefaultFileStatisticsCache, DefaultListFilesCache};
163    use crate::cache::CacheAccessor;
164    use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
165    use chrono::DateTime;
166    use datafusion_common::Statistics;
167    use object_store::path::Path;
168    use object_store::ObjectMeta;
169
170    #[test]
171    fn test_statistics_cache() {
172        let meta = ObjectMeta {
173            location: Path::from("test"),
174            last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
175                .unwrap()
176                .into(),
177            size: 1024,
178            e_tag: None,
179            version: None,
180        };
181        let cache = DefaultFileStatisticsCache::default();
182        assert!(cache.get_with_extra(&meta.location, &meta).is_none());
183
184        cache.put_with_extra(
185            &meta.location,
186            Statistics::new_unknown(&Schema::new(vec![Field::new(
187                "test_column",
188                DataType::Timestamp(TimeUnit::Second, None),
189                false,
190            )]))
191            .into(),
192            &meta,
193        );
194        assert!(cache.get_with_extra(&meta.location, &meta).is_some());
195
196        // file size changed
197        let mut meta2 = meta.clone();
198        meta2.size = 2048;
199        assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
200
201        // file last_modified changed
202        let mut meta2 = meta.clone();
203        meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00")
204            .unwrap()
205            .into();
206        assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
207
208        // different file
209        let mut meta2 = meta;
210        meta2.location = Path::from("test2");
211        assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
212    }
213
214    #[test]
215    fn test_list_file_cache() {
216        let meta = ObjectMeta {
217            location: Path::from("test"),
218            last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
219                .unwrap()
220                .into(),
221            size: 1024,
222            e_tag: None,
223            version: None,
224        };
225
226        let cache = DefaultListFilesCache::default();
227        assert!(cache.get(&meta.location).is_none());
228
229        cache.put(&meta.location, vec![meta.clone()].into());
230        assert_eq!(
231            cache.get(&meta.location).unwrap().first().unwrap().clone(),
232            meta.clone()
233        );
234    }
235}