datafusion_execution/cache/
cache_unit.rs1use std::collections::HashMap;
19
20use crate::cache::CacheAccessor;
21use crate::cache::cache_manager::{
22 CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry,
23};
24
25use dashmap::DashMap;
26use object_store::path::Path;
27
28pub use crate::cache::DefaultFilesMetadataCache;
29
30#[derive(Default)]
43pub struct DefaultFileStatisticsCache {
44 cache: DashMap<Path, CachedFileMetadata>,
45}
46
47impl CacheAccessor<Path, CachedFileMetadata> for DefaultFileStatisticsCache {
48 fn get(&self, key: &Path) -> Option<CachedFileMetadata> {
49 self.cache.get(key).map(|entry| entry.value().clone())
50 }
51
52 fn put(&self, key: &Path, value: CachedFileMetadata) -> Option<CachedFileMetadata> {
53 self.cache.insert(key.clone(), value)
54 }
55
56 fn remove(&self, k: &Path) -> Option<CachedFileMetadata> {
57 self.cache.remove(k).map(|(_, entry)| entry)
58 }
59
60 fn contains_key(&self, k: &Path) -> bool {
61 self.cache.contains_key(k)
62 }
63
64 fn len(&self) -> usize {
65 self.cache.len()
66 }
67
68 fn clear(&self) {
69 self.cache.clear();
70 }
71
72 fn name(&self) -> String {
73 "DefaultFileStatisticsCache".to_string()
74 }
75}
76
77impl FileStatisticsCache for DefaultFileStatisticsCache {
78 fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry> {
79 let mut entries = HashMap::<Path, FileStatisticsCacheEntry>::new();
80
81 for entry in self.cache.iter() {
82 let path = entry.key();
83 let cached = entry.value();
84 entries.insert(
85 path.clone(),
86 FileStatisticsCacheEntry {
87 object_meta: cached.meta.clone(),
88 num_rows: cached.statistics.num_rows,
89 num_columns: cached.statistics.column_statistics.len(),
90 table_size_bytes: cached.statistics.total_byte_size,
91 statistics_size_bytes: 0, has_ordering: cached.ordering.is_some(),
93 },
94 );
95 }
96
97 entries
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 use crate::cache::CacheAccessor;
105 use crate::cache::cache_manager::{
106 CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry,
107 };
108 use arrow::array::RecordBatch;
109 use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
110 use chrono::DateTime;
111 use datafusion_common::Statistics;
112 use datafusion_common::stats::Precision;
113 use datafusion_expr::ColumnarValue;
114 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
115 use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
116 use object_store::ObjectMeta;
117 use object_store::path::Path;
118 use std::sync::Arc;
119
120 fn create_test_meta(path: &str, size: u64) -> ObjectMeta {
121 ObjectMeta {
122 location: Path::from(path),
123 last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
124 .unwrap()
125 .into(),
126 size,
127 e_tag: None,
128 version: None,
129 }
130 }
131
132 #[test]
133 fn test_statistics_cache() {
134 let meta = create_test_meta("test", 1024);
135 let cache = DefaultFileStatisticsCache::default();
136
137 let schema = Schema::new(vec![Field::new(
138 "test_column",
139 DataType::Timestamp(TimeUnit::Second, None),
140 false,
141 )]);
142
143 assert!(cache.get(&meta.location).is_none());
145
146 let cached_value = CachedFileMetadata::new(
148 meta.clone(),
149 Arc::new(Statistics::new_unknown(&schema)),
150 None,
151 );
152 cache.put(&meta.location, cached_value);
153
154 let result = cache.get(&meta.location);
156 assert!(result.is_some());
157 let cached = result.unwrap();
158 assert!(cached.is_valid_for(&meta));
159
160 let meta2 = create_test_meta("test", 2048);
162 let cached = cache.get(&meta2.location).unwrap();
163 assert!(!cached.is_valid_for(&meta2));
164
165 let cached_value2 = CachedFileMetadata::new(
167 meta2.clone(),
168 Arc::new(Statistics::new_unknown(&schema)),
169 None,
170 );
171 cache.put(&meta2.location, cached_value2);
172
173 let entries = cache.list_entries();
175 assert_eq!(entries.len(), 1);
176 let entry = entries.get(&Path::from("test")).unwrap();
177 assert_eq!(entry.object_meta.size, 2048); }
179
180 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
181 struct MockExpr {}
182
183 impl std::fmt::Display for MockExpr {
184 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185 write!(f, "MockExpr")
186 }
187 }
188
189 impl PhysicalExpr for MockExpr {
190 fn as_any(&self) -> &dyn std::any::Any {
191 self
192 }
193
194 fn data_type(
195 &self,
196 _input_schema: &Schema,
197 ) -> datafusion_common::Result<DataType> {
198 Ok(DataType::Int32)
199 }
200
201 fn nullable(&self, _input_schema: &Schema) -> datafusion_common::Result<bool> {
202 Ok(false)
203 }
204
205 fn evaluate(
206 &self,
207 _batch: &RecordBatch,
208 ) -> datafusion_common::Result<ColumnarValue> {
209 unimplemented!()
210 }
211
212 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
213 vec![]
214 }
215
216 fn with_new_children(
217 self: Arc<Self>,
218 children: Vec<Arc<dyn PhysicalExpr>>,
219 ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
220 assert!(children.is_empty());
221 Ok(self)
222 }
223
224 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225 write!(f, "MockExpr")
226 }
227 }
228
229 fn ordering() -> LexOrdering {
230 let expr = Arc::new(MockExpr {}) as Arc<dyn PhysicalExpr>;
231 LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap()
232 }
233
234 #[test]
235 fn test_ordering_cache() {
236 let meta = create_test_meta("test.parquet", 100);
237 let cache = DefaultFileStatisticsCache::default();
238
239 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
240
241 let cached_value = CachedFileMetadata::new(
243 meta.clone(),
244 Arc::new(Statistics::new_unknown(&schema)),
245 None, );
247 cache.put(&meta.location, cached_value);
248
249 let result = cache.get(&meta.location).unwrap();
250 assert!(result.ordering.is_none());
251
252 let mut cached = cache.get(&meta.location).unwrap();
254 if cached.is_valid_for(&meta) && cached.ordering.is_none() {
255 cached.ordering = Some(ordering());
256 }
257 cache.put(&meta.location, cached);
258
259 let result2 = cache.get(&meta.location).unwrap();
260 assert!(result2.ordering.is_some());
261
262 let entries = cache.list_entries();
264 assert_eq!(entries.len(), 1);
265 assert!(entries.get(&meta.location).unwrap().has_ordering);
266 }
267
268 #[test]
269 fn test_cache_invalidation_on_file_modification() {
270 let cache = DefaultFileStatisticsCache::default();
271 let path = Path::from("test.parquet");
272 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
273
274 let meta_v1 = create_test_meta("test.parquet", 100);
275
276 let cached_value = CachedFileMetadata::new(
278 meta_v1.clone(),
279 Arc::new(Statistics::new_unknown(&schema)),
280 None,
281 );
282 cache.put(&path, cached_value);
283
284 let meta_v2 = create_test_meta("test.parquet", 200);
286
287 let cached = cache.get(&path).unwrap();
288 assert!(!cached.is_valid_for(&meta_v2));
290
291 let new_cached = CachedFileMetadata::new(
293 meta_v2.clone(),
294 Arc::new(Statistics::new_unknown(&schema)),
295 None,
296 );
297 cache.put(&path, new_cached);
298
299 let result = cache.get(&path).unwrap();
301 assert_eq!(result.meta.size, 200);
302 }
303
304 #[test]
305 fn test_ordering_cache_invalidation_on_file_modification() {
306 let cache = DefaultFileStatisticsCache::default();
307 let path = Path::from("test.parquet");
308 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
309
310 let meta_v1 = ObjectMeta {
312 location: path.clone(),
313 last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
314 .unwrap()
315 .into(),
316 size: 100,
317 e_tag: None,
318 version: None,
319 };
320 let ordering_v1 = ordering();
321 let cached_v1 = CachedFileMetadata::new(
322 meta_v1.clone(),
323 Arc::new(Statistics::new_unknown(&schema)),
324 Some(ordering_v1),
325 );
326 cache.put(&path, cached_v1);
327
328 let cached = cache.get(&path).unwrap();
330 assert!(cached.is_valid_for(&meta_v1));
331 assert!(cached.ordering.is_some());
332
333 let meta_v2 = ObjectMeta {
335 location: path.clone(),
336 last_modified: DateTime::parse_from_rfc3339("2022-09-28T10:00:00+02:00")
337 .unwrap()
338 .into(),
339 size: 200, e_tag: None,
341 version: None,
342 };
343
344 let cached = cache.get(&path).unwrap();
346 assert!(!cached.is_valid_for(&meta_v2));
347
348 let ordering_v2 = ordering(); let cached_v2 = CachedFileMetadata::new(
351 meta_v2.clone(),
352 Arc::new(Statistics::new_unknown(&schema)),
353 Some(ordering_v2),
354 );
355 cache.put(&path, cached_v2);
356
357 let cached = cache.get(&path).unwrap();
359 assert!(!cached.is_valid_for(&meta_v1));
360
361 assert!(cached.is_valid_for(&meta_v2));
363 assert!(cached.ordering.is_some());
364 }
365
366 #[test]
367 fn test_list_entries() {
368 let cache = DefaultFileStatisticsCache::default();
369 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
370
371 let meta1 = create_test_meta("test1.parquet", 100);
372
373 let cached_value = CachedFileMetadata::new(
374 meta1.clone(),
375 Arc::new(Statistics::new_unknown(&schema)),
376 None,
377 );
378 cache.put(&meta1.location, cached_value);
379 let meta2 = create_test_meta("test2.parquet", 200);
380 let cached_value = CachedFileMetadata::new(
381 meta2.clone(),
382 Arc::new(Statistics::new_unknown(&schema)),
383 Some(ordering()),
384 );
385 cache.put(&meta2.location, cached_value);
386
387 let entries = cache.list_entries();
388 assert_eq!(
389 entries,
390 HashMap::from([
391 (
392 Path::from("test1.parquet"),
393 FileStatisticsCacheEntry {
394 object_meta: meta1,
395 num_rows: Precision::Absent,
396 num_columns: 1,
397 table_size_bytes: Precision::Absent,
398 statistics_size_bytes: 0,
399 has_ordering: false,
400 }
401 ),
402 (
403 Path::from("test2.parquet"),
404 FileStatisticsCacheEntry {
405 object_meta: meta2,
406 num_rows: Precision::Absent,
407 num_columns: 1,
408 table_size_bytes: Precision::Absent,
409 statistics_size_bytes: 0,
410 has_ordering: true,
411 }
412 ),
413 ])
414 );
415 }
416}