Skip to main content

datafusion_execution/cache/
cache_unit.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19
20use crate::cache::CacheAccessor;
21use crate::cache::cache_manager::{
22    CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry,
23};
24
25use dashmap::DashMap;
26use object_store::path::Path;
27
28pub use crate::cache::DefaultFilesMetadataCache;
29
30/// Default implementation of [`FileStatisticsCache`]
31///
32/// Stores cached file metadata (statistics and orderings) for files.
33///
34/// The typical usage pattern is:
35/// 1. Call `get(path)` to check for cached value
36/// 2. If `Some(cached)`, validate with `cached.is_valid_for(&current_meta)`
37/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
38///
39/// Uses DashMap for lock-free concurrent access.
40///
41/// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache
42#[derive(Default)]
43pub struct DefaultFileStatisticsCache {
44    cache: DashMap<Path, CachedFileMetadata>,
45}
46
47impl CacheAccessor<Path, CachedFileMetadata> for DefaultFileStatisticsCache {
48    fn get(&self, key: &Path) -> Option<CachedFileMetadata> {
49        self.cache.get(key).map(|entry| entry.value().clone())
50    }
51
52    fn put(&self, key: &Path, value: CachedFileMetadata) -> Option<CachedFileMetadata> {
53        self.cache.insert(key.clone(), value)
54    }
55
56    fn remove(&self, k: &Path) -> Option<CachedFileMetadata> {
57        self.cache.remove(k).map(|(_, entry)| entry)
58    }
59
60    fn contains_key(&self, k: &Path) -> bool {
61        self.cache.contains_key(k)
62    }
63
64    fn len(&self) -> usize {
65        self.cache.len()
66    }
67
68    fn clear(&self) {
69        self.cache.clear();
70    }
71
72    fn name(&self) -> String {
73        "DefaultFileStatisticsCache".to_string()
74    }
75}
76
77impl FileStatisticsCache for DefaultFileStatisticsCache {
78    fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry> {
79        let mut entries = HashMap::<Path, FileStatisticsCacheEntry>::new();
80
81        for entry in self.cache.iter() {
82            let path = entry.key();
83            let cached = entry.value();
84            entries.insert(
85                path.clone(),
86                FileStatisticsCacheEntry {
87                    object_meta: cached.meta.clone(),
88                    num_rows: cached.statistics.num_rows,
89                    num_columns: cached.statistics.column_statistics.len(),
90                    table_size_bytes: cached.statistics.total_byte_size,
91                    statistics_size_bytes: 0, // TODO: set to the real size in the future
92                    has_ordering: cached.ordering.is_some(),
93                },
94            );
95        }
96
97        entries
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104    use crate::cache::CacheAccessor;
105    use crate::cache::cache_manager::{
106        CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry,
107    };
108    use arrow::array::RecordBatch;
109    use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
110    use chrono::DateTime;
111    use datafusion_common::Statistics;
112    use datafusion_common::stats::Precision;
113    use datafusion_expr::ColumnarValue;
114    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
115    use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
116    use object_store::ObjectMeta;
117    use object_store::path::Path;
118    use std::sync::Arc;
119
120    fn create_test_meta(path: &str, size: u64) -> ObjectMeta {
121        ObjectMeta {
122            location: Path::from(path),
123            last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
124                .unwrap()
125                .into(),
126            size,
127            e_tag: None,
128            version: None,
129        }
130    }
131
132    #[test]
133    fn test_statistics_cache() {
134        let meta = create_test_meta("test", 1024);
135        let cache = DefaultFileStatisticsCache::default();
136
137        let schema = Schema::new(vec![Field::new(
138            "test_column",
139            DataType::Timestamp(TimeUnit::Second, None),
140            false,
141        )]);
142
143        // Cache miss
144        assert!(cache.get(&meta.location).is_none());
145
146        // Put a value
147        let cached_value = CachedFileMetadata::new(
148            meta.clone(),
149            Arc::new(Statistics::new_unknown(&schema)),
150            None,
151        );
152        cache.put(&meta.location, cached_value);
153
154        // Cache hit
155        let result = cache.get(&meta.location);
156        assert!(result.is_some());
157        let cached = result.unwrap();
158        assert!(cached.is_valid_for(&meta));
159
160        // File size changed - validation should fail
161        let meta2 = create_test_meta("test", 2048);
162        let cached = cache.get(&meta2.location).unwrap();
163        assert!(!cached.is_valid_for(&meta2));
164
165        // Update with new value
166        let cached_value2 = CachedFileMetadata::new(
167            meta2.clone(),
168            Arc::new(Statistics::new_unknown(&schema)),
169            None,
170        );
171        cache.put(&meta2.location, cached_value2);
172
173        // Test list_entries
174        let entries = cache.list_entries();
175        assert_eq!(entries.len(), 1);
176        let entry = entries.get(&Path::from("test")).unwrap();
177        assert_eq!(entry.object_meta.size, 2048); // Should be updated value
178    }
179
180    #[derive(Clone, Debug, PartialEq, Eq, Hash)]
181    struct MockExpr {}
182
183    impl std::fmt::Display for MockExpr {
184        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185            write!(f, "MockExpr")
186        }
187    }
188
189    impl PhysicalExpr for MockExpr {
190        fn as_any(&self) -> &dyn std::any::Any {
191            self
192        }
193
194        fn data_type(
195            &self,
196            _input_schema: &Schema,
197        ) -> datafusion_common::Result<DataType> {
198            Ok(DataType::Int32)
199        }
200
201        fn nullable(&self, _input_schema: &Schema) -> datafusion_common::Result<bool> {
202            Ok(false)
203        }
204
205        fn evaluate(
206            &self,
207            _batch: &RecordBatch,
208        ) -> datafusion_common::Result<ColumnarValue> {
209            unimplemented!()
210        }
211
212        fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
213            vec![]
214        }
215
216        fn with_new_children(
217            self: Arc<Self>,
218            children: Vec<Arc<dyn PhysicalExpr>>,
219        ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
220            assert!(children.is_empty());
221            Ok(self)
222        }
223
224        fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225            write!(f, "MockExpr")
226        }
227    }
228
229    fn ordering() -> LexOrdering {
230        let expr = Arc::new(MockExpr {}) as Arc<dyn PhysicalExpr>;
231        LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap()
232    }
233
234    #[test]
235    fn test_ordering_cache() {
236        let meta = create_test_meta("test.parquet", 100);
237        let cache = DefaultFileStatisticsCache::default();
238
239        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
240
241        // Cache statistics with no ordering
242        let cached_value = CachedFileMetadata::new(
243            meta.clone(),
244            Arc::new(Statistics::new_unknown(&schema)),
245            None, // No ordering yet
246        );
247        cache.put(&meta.location, cached_value);
248
249        let result = cache.get(&meta.location).unwrap();
250        assert!(result.ordering.is_none());
251
252        // Update to add ordering
253        let mut cached = cache.get(&meta.location).unwrap();
254        if cached.is_valid_for(&meta) && cached.ordering.is_none() {
255            cached.ordering = Some(ordering());
256        }
257        cache.put(&meta.location, cached);
258
259        let result2 = cache.get(&meta.location).unwrap();
260        assert!(result2.ordering.is_some());
261
262        // Verify list_entries shows has_ordering = true
263        let entries = cache.list_entries();
264        assert_eq!(entries.len(), 1);
265        assert!(entries.get(&meta.location).unwrap().has_ordering);
266    }
267
268    #[test]
269    fn test_cache_invalidation_on_file_modification() {
270        let cache = DefaultFileStatisticsCache::default();
271        let path = Path::from("test.parquet");
272        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
273
274        let meta_v1 = create_test_meta("test.parquet", 100);
275
276        // Cache initial value
277        let cached_value = CachedFileMetadata::new(
278            meta_v1.clone(),
279            Arc::new(Statistics::new_unknown(&schema)),
280            None,
281        );
282        cache.put(&path, cached_value);
283
284        // File modified (size changed)
285        let meta_v2 = create_test_meta("test.parquet", 200);
286
287        let cached = cache.get(&path).unwrap();
288        // Should not be valid for new meta
289        assert!(!cached.is_valid_for(&meta_v2));
290
291        // Compute new value and update
292        let new_cached = CachedFileMetadata::new(
293            meta_v2.clone(),
294            Arc::new(Statistics::new_unknown(&schema)),
295            None,
296        );
297        cache.put(&path, new_cached);
298
299        // Should have new metadata
300        let result = cache.get(&path).unwrap();
301        assert_eq!(result.meta.size, 200);
302    }
303
304    #[test]
305    fn test_ordering_cache_invalidation_on_file_modification() {
306        let cache = DefaultFileStatisticsCache::default();
307        let path = Path::from("test.parquet");
308        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
309
310        // Cache with original metadata and ordering
311        let meta_v1 = ObjectMeta {
312            location: path.clone(),
313            last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
314                .unwrap()
315                .into(),
316            size: 100,
317            e_tag: None,
318            version: None,
319        };
320        let ordering_v1 = ordering();
321        let cached_v1 = CachedFileMetadata::new(
322            meta_v1.clone(),
323            Arc::new(Statistics::new_unknown(&schema)),
324            Some(ordering_v1),
325        );
326        cache.put(&path, cached_v1);
327
328        // Verify cached ordering is valid
329        let cached = cache.get(&path).unwrap();
330        assert!(cached.is_valid_for(&meta_v1));
331        assert!(cached.ordering.is_some());
332
333        // File modified (size changed)
334        let meta_v2 = ObjectMeta {
335            location: path.clone(),
336            last_modified: DateTime::parse_from_rfc3339("2022-09-28T10:00:00+02:00")
337                .unwrap()
338                .into(),
339            size: 200, // Changed
340            e_tag: None,
341            version: None,
342        };
343
344        // Cache entry exists but should be invalid for new metadata
345        let cached = cache.get(&path).unwrap();
346        assert!(!cached.is_valid_for(&meta_v2));
347
348        // Cache new version with different ordering
349        let ordering_v2 = ordering(); // New ordering instance
350        let cached_v2 = CachedFileMetadata::new(
351            meta_v2.clone(),
352            Arc::new(Statistics::new_unknown(&schema)),
353            Some(ordering_v2),
354        );
355        cache.put(&path, cached_v2);
356
357        // Old metadata should be invalid
358        let cached = cache.get(&path).unwrap();
359        assert!(!cached.is_valid_for(&meta_v1));
360
361        // New metadata should be valid
362        assert!(cached.is_valid_for(&meta_v2));
363        assert!(cached.ordering.is_some());
364    }
365
366    #[test]
367    fn test_list_entries() {
368        let cache = DefaultFileStatisticsCache::default();
369        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
370
371        let meta1 = create_test_meta("test1.parquet", 100);
372
373        let cached_value = CachedFileMetadata::new(
374            meta1.clone(),
375            Arc::new(Statistics::new_unknown(&schema)),
376            None,
377        );
378        cache.put(&meta1.location, cached_value);
379        let meta2 = create_test_meta("test2.parquet", 200);
380        let cached_value = CachedFileMetadata::new(
381            meta2.clone(),
382            Arc::new(Statistics::new_unknown(&schema)),
383            Some(ordering()),
384        );
385        cache.put(&meta2.location, cached_value);
386
387        let entries = cache.list_entries();
388        assert_eq!(
389            entries,
390            HashMap::from([
391                (
392                    Path::from("test1.parquet"),
393                    FileStatisticsCacheEntry {
394                        object_meta: meta1,
395                        num_rows: Precision::Absent,
396                        num_columns: 1,
397                        table_size_bytes: Precision::Absent,
398                        statistics_size_bytes: 0,
399                        has_ordering: false,
400                    }
401                ),
402                (
403                    Path::from("test2.parquet"),
404                    FileStatisticsCacheEntry {
405                        object_meta: meta2,
406                        num_rows: Precision::Absent,
407                        num_columns: 1,
408                        table_size_bytes: Precision::Absent,
409                        statistics_size_bytes: 0,
410                        has_ordering: true,
411                    }
412                ),
413            ])
414        );
415    }
416}