datafusion_execution/cache/
cache_unit.rs1use std::sync::Arc;
19
20use crate::cache::CacheAccessor;
21
22use datafusion_common::Statistics;
23
24use dashmap::DashMap;
25use object_store::path::Path;
26use object_store::ObjectMeta;
27
28#[derive(Default)]
31pub struct DefaultFileStatisticsCache {
32 statistics: DashMap<Path, (ObjectMeta, Arc<Statistics>)>,
33}
34
35impl CacheAccessor<Path, Arc<Statistics>> for DefaultFileStatisticsCache {
36 type Extra = ObjectMeta;
37
38 fn get(&self, k: &Path) -> Option<Arc<Statistics>> {
40 self.statistics
41 .get(k)
42 .map(|s| Some(Arc::clone(&s.value().1)))
43 .unwrap_or(None)
44 }
45
46 fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option<Arc<Statistics>> {
48 self.statistics
49 .get(k)
50 .map(|s| {
51 let (saved_meta, statistics) = s.value();
52 if saved_meta.size != e.size
53 || saved_meta.last_modified != e.last_modified
54 {
55 None
57 } else {
58 Some(Arc::clone(statistics))
59 }
60 })
61 .unwrap_or(None)
62 }
63
64 fn put(&self, _key: &Path, _value: Arc<Statistics>) -> Option<Arc<Statistics>> {
66 panic!("Put cache in DefaultFileStatisticsCache without Extra not supported.")
67 }
68
69 fn put_with_extra(
70 &self,
71 key: &Path,
72 value: Arc<Statistics>,
73 e: &Self::Extra,
74 ) -> Option<Arc<Statistics>> {
75 self.statistics
76 .insert(key.clone(), (e.clone(), value))
77 .map(|x| x.1)
78 }
79
80 fn remove(&mut self, k: &Path) -> Option<Arc<Statistics>> {
81 self.statistics.remove(k).map(|x| x.1 .1)
82 }
83
84 fn contains_key(&self, k: &Path) -> bool {
85 self.statistics.contains_key(k)
86 }
87
88 fn len(&self) -> usize {
89 self.statistics.len()
90 }
91
92 fn clear(&self) {
93 self.statistics.clear()
94 }
95 fn name(&self) -> String {
96 "DefaultFileStatisticsCache".to_string()
97 }
98}
99
100#[derive(Default)]
103pub struct DefaultListFilesCache {
104 statistics: DashMap<Path, Arc<Vec<ObjectMeta>>>,
105}
106
107impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
108 type Extra = ObjectMeta;
109
110 fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
111 self.statistics.get(k).map(|x| Arc::clone(x.value()))
112 }
113
114 fn get_with_extra(
115 &self,
116 _k: &Path,
117 _e: &Self::Extra,
118 ) -> Option<Arc<Vec<ObjectMeta>>> {
119 panic!("Not supported DefaultListFilesCache get_with_extra")
120 }
121
122 fn put(
123 &self,
124 key: &Path,
125 value: Arc<Vec<ObjectMeta>>,
126 ) -> Option<Arc<Vec<ObjectMeta>>> {
127 self.statistics.insert(key.clone(), value)
128 }
129
130 fn put_with_extra(
131 &self,
132 _key: &Path,
133 _value: Arc<Vec<ObjectMeta>>,
134 _e: &Self::Extra,
135 ) -> Option<Arc<Vec<ObjectMeta>>> {
136 panic!("Not supported DefaultListFilesCache put_with_extra")
137 }
138
139 fn remove(&mut self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
140 self.statistics.remove(k).map(|x| x.1)
141 }
142
143 fn contains_key(&self, k: &Path) -> bool {
144 self.statistics.contains_key(k)
145 }
146
147 fn len(&self) -> usize {
148 self.statistics.len()
149 }
150
151 fn clear(&self) {
152 self.statistics.clear()
153 }
154
155 fn name(&self) -> String {
156 "DefaultListFilesCache".to_string()
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use crate::cache::cache_unit::{DefaultFileStatisticsCache, DefaultListFilesCache};
163 use crate::cache::CacheAccessor;
164 use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
165 use chrono::DateTime;
166 use datafusion_common::Statistics;
167 use object_store::path::Path;
168 use object_store::ObjectMeta;
169
170 #[test]
171 fn test_statistics_cache() {
172 let meta = ObjectMeta {
173 location: Path::from("test"),
174 last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
175 .unwrap()
176 .into(),
177 size: 1024,
178 e_tag: None,
179 version: None,
180 };
181 let cache = DefaultFileStatisticsCache::default();
182 assert!(cache.get_with_extra(&meta.location, &meta).is_none());
183
184 cache.put_with_extra(
185 &meta.location,
186 Statistics::new_unknown(&Schema::new(vec![Field::new(
187 "test_column",
188 DataType::Timestamp(TimeUnit::Second, None),
189 false,
190 )]))
191 .into(),
192 &meta,
193 );
194 assert!(cache.get_with_extra(&meta.location, &meta).is_some());
195
196 let mut meta2 = meta.clone();
198 meta2.size = 2048;
199 assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
200
201 let mut meta2 = meta.clone();
203 meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00")
204 .unwrap()
205 .into();
206 assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
207
208 let mut meta2 = meta;
210 meta2.location = Path::from("test2");
211 assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
212 }
213
214 #[test]
215 fn test_list_file_cache() {
216 let meta = ObjectMeta {
217 location: Path::from("test"),
218 last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
219 .unwrap()
220 .into(),
221 size: 1024,
222 e_tag: None,
223 version: None,
224 };
225
226 let cache = DefaultListFilesCache::default();
227 assert!(cache.get(&meta.location).is_none());
228
229 cache.put(&meta.location, vec![meta.clone()].into());
230 assert_eq!(
231 cache.get(&meta.location).unwrap().first().unwrap().clone(),
232 meta.clone()
233 );
234 }
235}