Skip to main content

datafusion_execution/cache/
file_statistics_cache.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::cache::cache_manager::{
19    CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry,
20};
21use crate::cache::{CacheAccessor, TableScopedPath};
22use std::collections::HashMap;
23use std::sync::Mutex;
24
25pub use crate::cache::DefaultFilesMetadataCache;
26use crate::cache::lru_queue::LruQueue;
27use datafusion_common::TableReference;
28use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx};
29
30/// Default implementation of [`FileStatisticsCache`]
31///
32/// Stores cached file metadata (statistics and orderings) for files.
33///
34/// The typical usage pattern is:
35/// 1. Call `get(path)` to check for cached value
36/// 2. If `Some(cached)`, validate with `cached.is_valid_for(&current_meta)`
37/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
38///
39/// # Internal details
40///
41/// The `memory_limit` controls the maximum size of the cache, which uses a
42/// Least Recently Used eviction algorithm. When adding a new entry, if the total
43/// size of the cached entries exceeds `memory_limit`, the least recently used entries
44/// are evicted until the total size is lower than `memory_limit`.
45///
46///
47/// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache
48#[derive(Default)]
49pub struct DefaultFileStatisticsCache {
50    state: Mutex<DefaultFileStatisticsCacheState>,
51}
52
53impl DefaultFileStatisticsCache {
54    pub fn new(memory_limit: usize) -> Self {
55        Self {
56            state: Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)),
57        }
58    }
59
60    /// Returns the size of the cached memory, in bytes.
61    pub fn memory_used(&self) -> usize {
62        let state = self.state.lock().unwrap();
63        state.memory_used
64    }
65}
66
67struct DefaultFileStatisticsCacheState {
68    lru_queue: LruQueue<TableScopedPath, CachedFileMetadata>,
69    memory_limit: usize,
70    memory_used: usize,
71}
72
73pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 20 * 1024 * 1024; // 20MiB
74
75impl Default for DefaultFileStatisticsCacheState {
76    fn default() -> Self {
77        Self {
78            lru_queue: LruQueue::new(),
79            memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT,
80            memory_used: 0,
81        }
82    }
83}
84
85impl DefaultFileStatisticsCacheState {
86    fn new(memory_limit: usize) -> Self {
87        Self {
88            lru_queue: LruQueue::new(),
89            memory_limit,
90            memory_used: 0,
91        }
92    }
93    fn get(&mut self, key: &TableScopedPath) -> Option<CachedFileMetadata> {
94        self.lru_queue.get(key).cloned()
95    }
96
97    fn put(
98        &mut self,
99        key: &TableScopedPath,
100        value: CachedFileMetadata,
101    ) -> Option<CachedFileMetadata> {
102        let mut ctx = DFHeapSizeCtx::default();
103        let key_size = key.heap_size(&mut ctx);
104        let entry_size = value.heap_size(&mut ctx);
105
106        if entry_size + key_size > self.memory_limit {
107            // Remove potential stale entry
108            return self.remove(key);
109        }
110
111        self.memory_used += entry_size;
112        self.memory_used += key_size;
113
114        let old_value = self.lru_queue.put(key.clone(), value);
115        if let Some(old_entry) = &old_value {
116            let mut ctx = DFHeapSizeCtx::default();
117            self.memory_used -= old_entry.heap_size(&mut ctx);
118            self.memory_used -= key_size;
119        }
120
121        self.evict_entries();
122
123        old_value
124    }
125
126    fn remove(&mut self, k: &TableScopedPath) -> Option<CachedFileMetadata> {
127        if let Some(old_entry) = self.lru_queue.remove(k) {
128            let mut ctx = DFHeapSizeCtx::default();
129            self.memory_used -= k.heap_size(&mut ctx);
130            self.memory_used -= old_entry.heap_size(&mut ctx);
131            Some(old_entry)
132        } else {
133            None
134        }
135    }
136
137    fn contains_key(&self, k: &TableScopedPath) -> bool {
138        self.lru_queue.contains_key(k)
139    }
140
141    fn len(&self) -> usize {
142        self.lru_queue.len()
143    }
144
145    fn clear(&mut self) {
146        self.lru_queue.clear();
147        self.memory_used = 0;
148    }
149
150    fn evict_entries(&mut self) {
151        while self.memory_used > self.memory_limit {
152            if let Some(removed) = self.lru_queue.pop() {
153                let mut ctx = DFHeapSizeCtx::default();
154                self.memory_used -= removed.0.heap_size(&mut ctx);
155                self.memory_used -= removed.1.heap_size(&mut ctx);
156            } else {
157                // cache is empty while memory_used > memory_limit, cannot happen
158                log::error!(
159                    "File statistics cache memory accounting bug: memory_used={} but cache is empty. \
160                     Please report this to the Apache DataFusion developers.",
161                    self.memory_used
162                );
163                debug_assert!(
164                    false,
165                    "memory_used={} but cache is empty",
166                    self.memory_used
167                );
168                self.memory_used = 0;
169                return;
170            }
171        }
172    }
173}
174impl CacheAccessor<TableScopedPath, CachedFileMetadata> for DefaultFileStatisticsCache {
175    fn get(&self, key: &TableScopedPath) -> Option<CachedFileMetadata> {
176        let mut state = self.state.lock().unwrap();
177        state.get(key)
178    }
179
180    fn put(
181        &self,
182        key: &TableScopedPath,
183        value: CachedFileMetadata,
184    ) -> Option<CachedFileMetadata> {
185        let mut state = self.state.lock().unwrap();
186        state.put(key, value)
187    }
188
189    fn remove(&self, key: &TableScopedPath) -> Option<CachedFileMetadata> {
190        let mut state = self.state.lock().unwrap();
191        state.remove(key)
192    }
193
194    fn contains_key(&self, k: &TableScopedPath) -> bool {
195        let state = self.state.lock().unwrap();
196        state.contains_key(k)
197    }
198
199    fn len(&self) -> usize {
200        let state = self.state.lock().unwrap();
201        state.len()
202    }
203
204    fn clear(&self) {
205        let mut state = self.state.lock().unwrap();
206        state.clear();
207    }
208
209    fn name(&self) -> String {
210        "DefaultFileStatisticsCache".to_string()
211    }
212}
213
214impl FileStatisticsCache for DefaultFileStatisticsCache {
215    fn cache_limit(&self) -> usize {
216        let state = self.state.lock().unwrap();
217        state.memory_limit
218    }
219
220    fn update_cache_limit(&self, limit: usize) {
221        let mut state = self.state.lock().unwrap();
222        state.memory_limit = limit;
223        state.evict_entries();
224    }
225
226    fn list_entries(&self) -> HashMap<TableScopedPath, FileStatisticsCacheEntry> {
227        let mut entries = HashMap::<TableScopedPath, FileStatisticsCacheEntry>::new();
228        let mut ctx = DFHeapSizeCtx::default();
229        for entry in self.state.lock().unwrap().lru_queue.list_entries() {
230            let path = entry.0.clone();
231            let cached = entry.1;
232            entries.insert(
233                path,
234                FileStatisticsCacheEntry {
235                    object_meta: cached.meta.clone(),
236                    num_rows: cached.statistics.num_rows,
237                    num_columns: cached.statistics.column_statistics.len(),
238                    table_size_bytes: cached.statistics.total_byte_size,
239                    statistics_size_bytes: cached.statistics.heap_size(&mut ctx),
240                    has_ordering: cached.ordering.is_some(),
241                },
242            );
243        }
244
245        entries
246    }
247
248    fn drop_table_entries(
249        &self,
250        table_ref: &Option<TableReference>,
251    ) -> datafusion_common::Result<()> {
252        let mut state = self.state.lock().unwrap();
253        let mut table_paths = vec![];
254        for (path, _) in state.lru_queue.list_entries() {
255            if path.table == *table_ref {
256                table_paths.push(path.clone());
257            }
258        }
259        for path in table_paths {
260            state.remove(&path);
261        }
262        Ok(())
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use crate::cache::cache_manager::{
270        CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry,
271    };
272    use arrow::array::{Int32Array, ListArray, RecordBatch};
273    use arrow::buffer::{OffsetBuffer, ScalarBuffer};
274    use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
275    use chrono::DateTime;
276    use datafusion_common::heap_size::DFHeapSizeCtx;
277    use datafusion_common::stats::Precision;
278    use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
279    use datafusion_expr::ColumnarValue;
280    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
281    use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
282    use object_store::ObjectMeta;
283    use object_store::path::Path;
284    use std::sync::Arc;
285
286    fn create_test_meta(path: &str, size: u64) -> ObjectMeta {
287        ObjectMeta {
288            location: Path::from(path),
289            last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
290                .unwrap()
291                .into(),
292            size,
293            e_tag: None,
294            version: None,
295        }
296    }
297
298    #[test]
299    fn test_statistics_cache() {
300        let meta = create_test_meta("test", 1024);
301        let cache = DefaultFileStatisticsCache::default();
302
303        let schema = Schema::new(vec![Field::new(
304            "test_column",
305            DataType::Timestamp(TimeUnit::Second, None),
306            false,
307        )]);
308
309        let path = TableScopedPath {
310            path: meta.location.clone(),
311            table: None,
312        };
313
314        // Cache miss
315        assert!(cache.get(&path).is_none());
316
317        // Put a value
318        let cached_value = CachedFileMetadata::new(
319            meta.clone(),
320            Arc::new(Statistics::new_unknown(&schema)),
321            None,
322        );
323        cache.put(&path, cached_value);
324
325        // Cache hit
326        let result = cache.get(&path);
327        assert!(result.is_some());
328
329        let cached = result.unwrap();
330        assert!(cached.is_valid_for(&meta));
331
332        // File size changed - validation should fail
333        let meta2 = create_test_meta("test", 2048);
334
335        let path_2 = TableScopedPath {
336            path: meta2.location.clone(),
337            table: None,
338        };
339
340        let cached = cache.get(&path_2).unwrap();
341        assert!(!cached.is_valid_for(&meta2));
342
343        // Update with new value
344        let cached_value2 = CachedFileMetadata::new(
345            meta2.clone(),
346            Arc::new(Statistics::new_unknown(&schema)),
347            None,
348        );
349        cache.put(&path_2, cached_value2);
350
351        // Test list_entries
352        let entries = cache.list_entries();
353        assert_eq!(entries.len(), 1);
354
355        let path_3 = TableScopedPath {
356            path: Path::from("test"),
357            table: None,
358        };
359
360        let entry = entries.get(&path_3).unwrap();
361        assert_eq!(entry.object_meta.size, 2048); // Should be updated value
362    }
363
364    #[derive(Clone, Debug, PartialEq, Eq, Hash)]
365    struct MockExpr {}
366
367    impl std::fmt::Display for MockExpr {
368        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369            write!(f, "MockExpr")
370        }
371    }
372
373    impl PhysicalExpr for MockExpr {
374        fn data_type(
375            &self,
376            _input_schema: &Schema,
377        ) -> datafusion_common::Result<DataType> {
378            Ok(DataType::Int32)
379        }
380
381        fn nullable(&self, _input_schema: &Schema) -> datafusion_common::Result<bool> {
382            Ok(false)
383        }
384
385        fn evaluate(
386            &self,
387            _batch: &RecordBatch,
388        ) -> datafusion_common::Result<ColumnarValue> {
389            unimplemented!()
390        }
391
392        fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
393            vec![]
394        }
395
396        fn with_new_children(
397            self: Arc<Self>,
398            children: Vec<Arc<dyn PhysicalExpr>>,
399        ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
400            assert!(children.is_empty());
401            Ok(self)
402        }
403
404        fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
405            write!(f, "MockExpr")
406        }
407    }
408
409    fn ordering() -> LexOrdering {
410        let expr = Arc::new(MockExpr {}) as Arc<dyn PhysicalExpr>;
411        LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap()
412    }
413
414    #[test]
415    fn test_ordering_cache() {
416        let meta = create_test_meta("test.parquet", 100);
417        let cache = DefaultFileStatisticsCache::default();
418
419        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
420
421        // Cache statistics with no ordering
422        let cached_value = CachedFileMetadata::new(
423            meta.clone(),
424            Arc::new(Statistics::new_unknown(&schema)),
425            None, // No ordering yet
426        );
427
428        let path = TableScopedPath {
429            path: meta.location.clone(),
430            table: None,
431        };
432
433        cache.put(&path, cached_value);
434
435        let result = cache.get(&path).unwrap();
436        assert!(result.ordering.is_none());
437
438        // Update to add ordering
439        let mut cached = cache.get(&path).unwrap();
440        if cached.is_valid_for(&meta) && cached.ordering.is_none() {
441            cached.ordering = Some(ordering());
442        }
443        cache.put(&path, cached);
444
445        let result2 = cache.get(&path).unwrap();
446        assert!(result2.ordering.is_some());
447
448        // Verify list_entries shows has_ordering = true
449        let entries = cache.list_entries();
450        assert_eq!(entries.len(), 1);
451        assert!(entries.get(&path).unwrap().has_ordering);
452    }
453
454    #[test]
455    fn test_cache_invalidation_on_file_modification() {
456        let cache = DefaultFileStatisticsCache::default();
457        let path = TableScopedPath {
458            path: Path::from("test.parquet"),
459            table: None,
460        };
461        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
462
463        let meta_v1 = create_test_meta("test.parquet", 100);
464
465        // Cache initial value
466        let cached_value = CachedFileMetadata::new(
467            meta_v1.clone(),
468            Arc::new(Statistics::new_unknown(&schema)),
469            None,
470        );
471        cache.put(&path, cached_value);
472
473        // File modified (size changed)
474        let meta_v2 = create_test_meta("test.parquet", 200);
475
476        let cached = cache.get(&path).unwrap();
477        // Should not be valid for new meta
478        assert!(!cached.is_valid_for(&meta_v2));
479
480        // Compute new value and update
481        let new_cached = CachedFileMetadata::new(
482            meta_v2.clone(),
483            Arc::new(Statistics::new_unknown(&schema)),
484            None,
485        );
486        cache.put(&path, new_cached);
487
488        // Should have new metadata
489        let result = cache.get(&path).unwrap();
490        assert_eq!(result.meta.size, 200);
491    }
492
493    #[test]
494    fn test_ordering_cache_invalidation_on_file_modification() {
495        let cache = DefaultFileStatisticsCache::default();
496        let path = TableScopedPath {
497            path: Path::from("test.parquet"),
498            table: None,
499        };
500        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
501
502        // Cache with original metadata and ordering
503        let meta_v1 = ObjectMeta {
504            location: path.path.clone(),
505            last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
506                .unwrap()
507                .into(),
508            size: 100,
509            e_tag: None,
510            version: None,
511        };
512        let ordering_v1 = ordering();
513        let cached_v1 = CachedFileMetadata::new(
514            meta_v1.clone(),
515            Arc::new(Statistics::new_unknown(&schema)),
516            Some(ordering_v1),
517        );
518        cache.put(&path, cached_v1);
519
520        // Verify cached ordering is valid
521        let cached = cache.get(&path).unwrap();
522        assert!(cached.is_valid_for(&meta_v1));
523        assert!(cached.ordering.is_some());
524
525        // File modified (size changed)
526        let meta_v2 = ObjectMeta {
527            location: path.path.clone(),
528            last_modified: DateTime::parse_from_rfc3339("2022-09-28T10:00:00+02:00")
529                .unwrap()
530                .into(),
531            size: 200, // Changed
532            e_tag: None,
533            version: None,
534        };
535
536        // Cache entry exists but should be invalid for new metadata
537        let cached = cache.get(&path).unwrap();
538        assert!(!cached.is_valid_for(&meta_v2));
539
540        // Cache new version with different ordering
541        let ordering_v2 = ordering(); // New ordering instance
542        let cached_v2 = CachedFileMetadata::new(
543            meta_v2.clone(),
544            Arc::new(Statistics::new_unknown(&schema)),
545            Some(ordering_v2),
546        );
547        cache.put(&path, cached_v2);
548
549        // Old metadata should be invalid
550        let cached = cache.get(&path).unwrap();
551        assert!(!cached.is_valid_for(&meta_v1));
552
553        // New metadata should be valid
554        assert!(cached.is_valid_for(&meta_v2));
555        assert!(cached.ordering.is_some());
556    }
557
558    #[test]
559    fn test_list_entries() {
560        let cache = DefaultFileStatisticsCache::default();
561        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
562
563        let meta1 = create_test_meta("test1.parquet", 100);
564
565        let cached_value = CachedFileMetadata::new(
566            meta1.clone(),
567            Arc::new(Statistics::new_unknown(&schema)),
568            None,
569        );
570
571        let path_1 = TableScopedPath {
572            path: meta1.location.clone(),
573            table: None,
574        };
575
576        cache.put(&path_1, cached_value);
577        let meta2 = create_test_meta("test2.parquet", 200);
578        let cached_value = CachedFileMetadata::new(
579            meta2.clone(),
580            Arc::new(Statistics::new_unknown(&schema)),
581            Some(ordering()),
582        );
583
584        let path_2 = TableScopedPath {
585            path: meta2.location.clone(),
586            table: None,
587        };
588
589        cache.put(&path_2, cached_value);
590
591        let entries = cache.list_entries();
592        assert_eq!(
593            entries,
594            HashMap::from([
595                (
596                    path_1,
597                    FileStatisticsCacheEntry {
598                        object_meta: meta1,
599                        num_rows: Precision::Absent,
600                        num_columns: 1,
601                        table_size_bytes: Precision::Absent,
602                        statistics_size_bytes: 360,
603                        has_ordering: false,
604                    }
605                ),
606                (
607                    path_2,
608                    FileStatisticsCacheEntry {
609                        object_meta: meta2,
610                        num_rows: Precision::Absent,
611                        num_columns: 1,
612                        table_size_bytes: Precision::Absent,
613                        statistics_size_bytes: 360,
614                        has_ordering: true,
615                    }
616                ),
617            ])
618        );
619    }
620
621    #[test]
622    fn test_cache_entry_added_when_entries_are_within_cache_limit() {
623        let (meta_1, value_1) = create_cached_file_metadata_with_stats("test1.parquet");
624        let (meta_2, value_2) = create_cached_file_metadata_with_stats("test2.parquet");
625        let (meta_3, value_3) = create_cached_file_metadata_with_stats("test3.parquet");
626
627        let mut ctx = DFHeapSizeCtx::default();
628
629        let limit_for_2_entries = meta_1.location.as_ref().heap_size(&mut ctx)
630            + value_1.heap_size(&mut ctx)
631            + meta_2.location.as_ref().heap_size(&mut ctx)
632            + value_2.heap_size(&mut ctx);
633
634        // create a cache with a limit which fits exactly 2 entries
635        let cache = DefaultFileStatisticsCache::new(limit_for_2_entries);
636        let path_1 = TableScopedPath {
637            path: meta_1.location.clone(),
638            table: None,
639        };
640
641        let path_2 = TableScopedPath {
642            path: meta_2.location.clone(),
643            table: None,
644        };
645
646        cache.put(&path_1, value_1.clone());
647        cache.put(&path_2, value_2.clone());
648
649        assert_eq!(cache.len(), 2);
650        assert_eq!(cache.memory_used(), limit_for_2_entries);
651
652        let result_1 = cache.get(&path_1);
653        let result_2 = cache.get(&path_2);
654        assert_eq!(result_1.unwrap(), value_1);
655        assert_eq!(result_2.unwrap(), value_2);
656
657        let path_3 = TableScopedPath {
658            path: meta_3.location.clone(),
659            table: None,
660        };
661
662        // adding the third entry evicts the first entry
663        cache.put(&path_3, value_3.clone());
664        assert_eq!(cache.len(), 2);
665        assert_eq!(cache.memory_used(), limit_for_2_entries);
666
667        let result_1 = cache.get(&path_1);
668        assert!(result_1.is_none());
669
670        let result_2 = cache.get(&path_2);
671        let result_3 = cache.get(&path_3);
672
673        assert_eq!(result_2.unwrap(), value_2);
674        assert_eq!(result_3.unwrap(), value_3);
675
676        // add the third entry again, making sure memory usage remains the same
677        cache.put(&path_3, value_3.clone());
678        assert_eq!(cache.memory_used(), limit_for_2_entries);
679        cache.put(&path_3, value_3.clone());
680        assert_eq!(cache.memory_used(), limit_for_2_entries);
681
682        let mut ctx = DFHeapSizeCtx::default();
683        cache.remove(&path_2);
684        assert_eq!(cache.len(), 1);
685        assert_eq!(
686            cache.memory_used(),
687            meta_3.location.as_ref().heap_size(&mut ctx) + value_3.heap_size(&mut ctx)
688        );
689
690        cache.clear();
691        assert_eq!(cache.len(), 0);
692        assert_eq!(cache.memory_used(), 0);
693    }
694
695    #[test]
696    fn test_cache_rejects_entry_which_is_too_large() {
697        let (meta, value) = create_cached_file_metadata_with_stats("test1.parquet");
698        let mut ctx = DFHeapSizeCtx::default();
699        let limit_less_than_the_entry = value.heap_size(&mut ctx) - 1;
700
701        // create a cache with a size less than the entry
702        let cache = DefaultFileStatisticsCache::new(limit_less_than_the_entry);
703
704        let path_1 = TableScopedPath {
705            path: meta.location.clone(),
706            table: None,
707        };
708
709        cache.put(&path_1, value);
710
711        assert_eq!(cache.len(), 0);
712        assert_eq!(cache.memory_used(), 0);
713    }
714
715    fn create_cached_file_metadata_with_stats(
716        file_name: &str,
717    ) -> (ObjectMeta, CachedFileMetadata) {
718        let series: Vec<i32> = (0..=10).collect();
719        let values = Int32Array::from(series);
720        let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 11]));
721        let field = Arc::new(Field::new_list_field(DataType::Int32, false));
722        let list_array = ListArray::new(field, offsets, Arc::new(values), None);
723
724        let column_statistics = ColumnStatistics {
725            null_count: Precision::Exact(1),
726            max_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))),
727            min_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))),
728            sum_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))),
729            distinct_count: Precision::Exact(10),
730            byte_size: Precision::Absent,
731        };
732
733        let stats = Statistics {
734            num_rows: Precision::Exact(100),
735            total_byte_size: Precision::Exact(100),
736            column_statistics: vec![column_statistics.clone()],
737        };
738        let mut ctx = DFHeapSizeCtx::default();
739        let object_meta = create_test_meta(file_name, stats.heap_size(&mut ctx) as u64);
740        let value =
741            CachedFileMetadata::new(object_meta.clone(), Arc::new(stats.clone()), None);
742        (object_meta, value)
743    }
744}