1use crate::cache::cache_manager::{
19 CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry,
20};
21use crate::cache::{CacheAccessor, TableScopedPath};
22use std::collections::HashMap;
23use std::sync::Mutex;
24
25pub use crate::cache::DefaultFilesMetadataCache;
26use crate::cache::lru_queue::LruQueue;
27use datafusion_common::TableReference;
28use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx};
29
30#[derive(Default)]
49pub struct DefaultFileStatisticsCache {
50 state: Mutex<DefaultFileStatisticsCacheState>,
51}
52
53impl DefaultFileStatisticsCache {
54 pub fn new(memory_limit: usize) -> Self {
55 Self {
56 state: Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)),
57 }
58 }
59
60 pub fn memory_used(&self) -> usize {
62 let state = self.state.lock().unwrap();
63 state.memory_used
64 }
65}
66
67struct DefaultFileStatisticsCacheState {
68 lru_queue: LruQueue<TableScopedPath, CachedFileMetadata>,
69 memory_limit: usize,
70 memory_used: usize,
71}
72
73pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 20 * 1024 * 1024; impl Default for DefaultFileStatisticsCacheState {
76 fn default() -> Self {
77 Self {
78 lru_queue: LruQueue::new(),
79 memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT,
80 memory_used: 0,
81 }
82 }
83}
84
85impl DefaultFileStatisticsCacheState {
86 fn new(memory_limit: usize) -> Self {
87 Self {
88 lru_queue: LruQueue::new(),
89 memory_limit,
90 memory_used: 0,
91 }
92 }
93 fn get(&mut self, key: &TableScopedPath) -> Option<CachedFileMetadata> {
94 self.lru_queue.get(key).cloned()
95 }
96
97 fn put(
98 &mut self,
99 key: &TableScopedPath,
100 value: CachedFileMetadata,
101 ) -> Option<CachedFileMetadata> {
102 let mut ctx = DFHeapSizeCtx::default();
103 let key_size = key.heap_size(&mut ctx);
104 let entry_size = value.heap_size(&mut ctx);
105
106 if entry_size + key_size > self.memory_limit {
107 return self.remove(key);
109 }
110
111 self.memory_used += entry_size;
112 self.memory_used += key_size;
113
114 let old_value = self.lru_queue.put(key.clone(), value);
115 if let Some(old_entry) = &old_value {
116 let mut ctx = DFHeapSizeCtx::default();
117 self.memory_used -= old_entry.heap_size(&mut ctx);
118 self.memory_used -= key_size;
119 }
120
121 self.evict_entries();
122
123 old_value
124 }
125
126 fn remove(&mut self, k: &TableScopedPath) -> Option<CachedFileMetadata> {
127 if let Some(old_entry) = self.lru_queue.remove(k) {
128 let mut ctx = DFHeapSizeCtx::default();
129 self.memory_used -= k.heap_size(&mut ctx);
130 self.memory_used -= old_entry.heap_size(&mut ctx);
131 Some(old_entry)
132 } else {
133 None
134 }
135 }
136
137 fn contains_key(&self, k: &TableScopedPath) -> bool {
138 self.lru_queue.contains_key(k)
139 }
140
141 fn len(&self) -> usize {
142 self.lru_queue.len()
143 }
144
145 fn clear(&mut self) {
146 self.lru_queue.clear();
147 self.memory_used = 0;
148 }
149
150 fn evict_entries(&mut self) {
151 while self.memory_used > self.memory_limit {
152 if let Some(removed) = self.lru_queue.pop() {
153 let mut ctx = DFHeapSizeCtx::default();
154 self.memory_used -= removed.0.heap_size(&mut ctx);
155 self.memory_used -= removed.1.heap_size(&mut ctx);
156 } else {
157 log::error!(
159 "File statistics cache memory accounting bug: memory_used={} but cache is empty. \
160 Please report this to the Apache DataFusion developers.",
161 self.memory_used
162 );
163 debug_assert!(
164 false,
165 "memory_used={} but cache is empty",
166 self.memory_used
167 );
168 self.memory_used = 0;
169 return;
170 }
171 }
172 }
173}
174impl CacheAccessor<TableScopedPath, CachedFileMetadata> for DefaultFileStatisticsCache {
175 fn get(&self, key: &TableScopedPath) -> Option<CachedFileMetadata> {
176 let mut state = self.state.lock().unwrap();
177 state.get(key)
178 }
179
180 fn put(
181 &self,
182 key: &TableScopedPath,
183 value: CachedFileMetadata,
184 ) -> Option<CachedFileMetadata> {
185 let mut state = self.state.lock().unwrap();
186 state.put(key, value)
187 }
188
189 fn remove(&self, key: &TableScopedPath) -> Option<CachedFileMetadata> {
190 let mut state = self.state.lock().unwrap();
191 state.remove(key)
192 }
193
194 fn contains_key(&self, k: &TableScopedPath) -> bool {
195 let state = self.state.lock().unwrap();
196 state.contains_key(k)
197 }
198
199 fn len(&self) -> usize {
200 let state = self.state.lock().unwrap();
201 state.len()
202 }
203
204 fn clear(&self) {
205 let mut state = self.state.lock().unwrap();
206 state.clear();
207 }
208
209 fn name(&self) -> String {
210 "DefaultFileStatisticsCache".to_string()
211 }
212}
213
214impl FileStatisticsCache for DefaultFileStatisticsCache {
215 fn cache_limit(&self) -> usize {
216 let state = self.state.lock().unwrap();
217 state.memory_limit
218 }
219
220 fn update_cache_limit(&self, limit: usize) {
221 let mut state = self.state.lock().unwrap();
222 state.memory_limit = limit;
223 state.evict_entries();
224 }
225
226 fn list_entries(&self) -> HashMap<TableScopedPath, FileStatisticsCacheEntry> {
227 let mut entries = HashMap::<TableScopedPath, FileStatisticsCacheEntry>::new();
228 let mut ctx = DFHeapSizeCtx::default();
229 for entry in self.state.lock().unwrap().lru_queue.list_entries() {
230 let path = entry.0.clone();
231 let cached = entry.1;
232 entries.insert(
233 path,
234 FileStatisticsCacheEntry {
235 object_meta: cached.meta.clone(),
236 num_rows: cached.statistics.num_rows,
237 num_columns: cached.statistics.column_statistics.len(),
238 table_size_bytes: cached.statistics.total_byte_size,
239 statistics_size_bytes: cached.statistics.heap_size(&mut ctx),
240 has_ordering: cached.ordering.is_some(),
241 },
242 );
243 }
244
245 entries
246 }
247
248 fn drop_table_entries(
249 &self,
250 table_ref: &Option<TableReference>,
251 ) -> datafusion_common::Result<()> {
252 let mut state = self.state.lock().unwrap();
253 let mut table_paths = vec![];
254 for (path, _) in state.lru_queue.list_entries() {
255 if path.table == *table_ref {
256 table_paths.push(path.clone());
257 }
258 }
259 for path in table_paths {
260 state.remove(&path);
261 }
262 Ok(())
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use crate::cache::cache_manager::{
270 CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry,
271 };
272 use arrow::array::{Int32Array, ListArray, RecordBatch};
273 use arrow::buffer::{OffsetBuffer, ScalarBuffer};
274 use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
275 use chrono::DateTime;
276 use datafusion_common::heap_size::DFHeapSizeCtx;
277 use datafusion_common::stats::Precision;
278 use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
279 use datafusion_expr::ColumnarValue;
280 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
281 use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
282 use object_store::ObjectMeta;
283 use object_store::path::Path;
284 use std::sync::Arc;
285
286 fn create_test_meta(path: &str, size: u64) -> ObjectMeta {
287 ObjectMeta {
288 location: Path::from(path),
289 last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
290 .unwrap()
291 .into(),
292 size,
293 e_tag: None,
294 version: None,
295 }
296 }
297
298 #[test]
299 fn test_statistics_cache() {
300 let meta = create_test_meta("test", 1024);
301 let cache = DefaultFileStatisticsCache::default();
302
303 let schema = Schema::new(vec![Field::new(
304 "test_column",
305 DataType::Timestamp(TimeUnit::Second, None),
306 false,
307 )]);
308
309 let path = TableScopedPath {
310 path: meta.location.clone(),
311 table: None,
312 };
313
314 assert!(cache.get(&path).is_none());
316
317 let cached_value = CachedFileMetadata::new(
319 meta.clone(),
320 Arc::new(Statistics::new_unknown(&schema)),
321 None,
322 );
323 cache.put(&path, cached_value);
324
325 let result = cache.get(&path);
327 assert!(result.is_some());
328
329 let cached = result.unwrap();
330 assert!(cached.is_valid_for(&meta));
331
332 let meta2 = create_test_meta("test", 2048);
334
335 let path_2 = TableScopedPath {
336 path: meta2.location.clone(),
337 table: None,
338 };
339
340 let cached = cache.get(&path_2).unwrap();
341 assert!(!cached.is_valid_for(&meta2));
342
343 let cached_value2 = CachedFileMetadata::new(
345 meta2.clone(),
346 Arc::new(Statistics::new_unknown(&schema)),
347 None,
348 );
349 cache.put(&path_2, cached_value2);
350
351 let entries = cache.list_entries();
353 assert_eq!(entries.len(), 1);
354
355 let path_3 = TableScopedPath {
356 path: Path::from("test"),
357 table: None,
358 };
359
360 let entry = entries.get(&path_3).unwrap();
361 assert_eq!(entry.object_meta.size, 2048); }
363
364 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
365 struct MockExpr {}
366
367 impl std::fmt::Display for MockExpr {
368 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369 write!(f, "MockExpr")
370 }
371 }
372
373 impl PhysicalExpr for MockExpr {
374 fn data_type(
375 &self,
376 _input_schema: &Schema,
377 ) -> datafusion_common::Result<DataType> {
378 Ok(DataType::Int32)
379 }
380
381 fn nullable(&self, _input_schema: &Schema) -> datafusion_common::Result<bool> {
382 Ok(false)
383 }
384
385 fn evaluate(
386 &self,
387 _batch: &RecordBatch,
388 ) -> datafusion_common::Result<ColumnarValue> {
389 unimplemented!()
390 }
391
392 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
393 vec![]
394 }
395
396 fn with_new_children(
397 self: Arc<Self>,
398 children: Vec<Arc<dyn PhysicalExpr>>,
399 ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
400 assert!(children.is_empty());
401 Ok(self)
402 }
403
404 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
405 write!(f, "MockExpr")
406 }
407 }
408
409 fn ordering() -> LexOrdering {
410 let expr = Arc::new(MockExpr {}) as Arc<dyn PhysicalExpr>;
411 LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap()
412 }
413
414 #[test]
415 fn test_ordering_cache() {
416 let meta = create_test_meta("test.parquet", 100);
417 let cache = DefaultFileStatisticsCache::default();
418
419 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
420
421 let cached_value = CachedFileMetadata::new(
423 meta.clone(),
424 Arc::new(Statistics::new_unknown(&schema)),
425 None, );
427
428 let path = TableScopedPath {
429 path: meta.location.clone(),
430 table: None,
431 };
432
433 cache.put(&path, cached_value);
434
435 let result = cache.get(&path).unwrap();
436 assert!(result.ordering.is_none());
437
438 let mut cached = cache.get(&path).unwrap();
440 if cached.is_valid_for(&meta) && cached.ordering.is_none() {
441 cached.ordering = Some(ordering());
442 }
443 cache.put(&path, cached);
444
445 let result2 = cache.get(&path).unwrap();
446 assert!(result2.ordering.is_some());
447
448 let entries = cache.list_entries();
450 assert_eq!(entries.len(), 1);
451 assert!(entries.get(&path).unwrap().has_ordering);
452 }
453
454 #[test]
455 fn test_cache_invalidation_on_file_modification() {
456 let cache = DefaultFileStatisticsCache::default();
457 let path = TableScopedPath {
458 path: Path::from("test.parquet"),
459 table: None,
460 };
461 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
462
463 let meta_v1 = create_test_meta("test.parquet", 100);
464
465 let cached_value = CachedFileMetadata::new(
467 meta_v1.clone(),
468 Arc::new(Statistics::new_unknown(&schema)),
469 None,
470 );
471 cache.put(&path, cached_value);
472
473 let meta_v2 = create_test_meta("test.parquet", 200);
475
476 let cached = cache.get(&path).unwrap();
477 assert!(!cached.is_valid_for(&meta_v2));
479
480 let new_cached = CachedFileMetadata::new(
482 meta_v2.clone(),
483 Arc::new(Statistics::new_unknown(&schema)),
484 None,
485 );
486 cache.put(&path, new_cached);
487
488 let result = cache.get(&path).unwrap();
490 assert_eq!(result.meta.size, 200);
491 }
492
493 #[test]
494 fn test_ordering_cache_invalidation_on_file_modification() {
495 let cache = DefaultFileStatisticsCache::default();
496 let path = TableScopedPath {
497 path: Path::from("test.parquet"),
498 table: None,
499 };
500 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
501
502 let meta_v1 = ObjectMeta {
504 location: path.path.clone(),
505 last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
506 .unwrap()
507 .into(),
508 size: 100,
509 e_tag: None,
510 version: None,
511 };
512 let ordering_v1 = ordering();
513 let cached_v1 = CachedFileMetadata::new(
514 meta_v1.clone(),
515 Arc::new(Statistics::new_unknown(&schema)),
516 Some(ordering_v1),
517 );
518 cache.put(&path, cached_v1);
519
520 let cached = cache.get(&path).unwrap();
522 assert!(cached.is_valid_for(&meta_v1));
523 assert!(cached.ordering.is_some());
524
525 let meta_v2 = ObjectMeta {
527 location: path.path.clone(),
528 last_modified: DateTime::parse_from_rfc3339("2022-09-28T10:00:00+02:00")
529 .unwrap()
530 .into(),
531 size: 200, e_tag: None,
533 version: None,
534 };
535
536 let cached = cache.get(&path).unwrap();
538 assert!(!cached.is_valid_for(&meta_v2));
539
540 let ordering_v2 = ordering(); let cached_v2 = CachedFileMetadata::new(
543 meta_v2.clone(),
544 Arc::new(Statistics::new_unknown(&schema)),
545 Some(ordering_v2),
546 );
547 cache.put(&path, cached_v2);
548
549 let cached = cache.get(&path).unwrap();
551 assert!(!cached.is_valid_for(&meta_v1));
552
553 assert!(cached.is_valid_for(&meta_v2));
555 assert!(cached.ordering.is_some());
556 }
557
558 #[test]
559 fn test_list_entries() {
560 let cache = DefaultFileStatisticsCache::default();
561 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
562
563 let meta1 = create_test_meta("test1.parquet", 100);
564
565 let cached_value = CachedFileMetadata::new(
566 meta1.clone(),
567 Arc::new(Statistics::new_unknown(&schema)),
568 None,
569 );
570
571 let path_1 = TableScopedPath {
572 path: meta1.location.clone(),
573 table: None,
574 };
575
576 cache.put(&path_1, cached_value);
577 let meta2 = create_test_meta("test2.parquet", 200);
578 let cached_value = CachedFileMetadata::new(
579 meta2.clone(),
580 Arc::new(Statistics::new_unknown(&schema)),
581 Some(ordering()),
582 );
583
584 let path_2 = TableScopedPath {
585 path: meta2.location.clone(),
586 table: None,
587 };
588
589 cache.put(&path_2, cached_value);
590
591 let entries = cache.list_entries();
592 assert_eq!(
593 entries,
594 HashMap::from([
595 (
596 path_1,
597 FileStatisticsCacheEntry {
598 object_meta: meta1,
599 num_rows: Precision::Absent,
600 num_columns: 1,
601 table_size_bytes: Precision::Absent,
602 statistics_size_bytes: 360,
603 has_ordering: false,
604 }
605 ),
606 (
607 path_2,
608 FileStatisticsCacheEntry {
609 object_meta: meta2,
610 num_rows: Precision::Absent,
611 num_columns: 1,
612 table_size_bytes: Precision::Absent,
613 statistics_size_bytes: 360,
614 has_ordering: true,
615 }
616 ),
617 ])
618 );
619 }
620
621 #[test]
622 fn test_cache_entry_added_when_entries_are_within_cache_limit() {
623 let (meta_1, value_1) = create_cached_file_metadata_with_stats("test1.parquet");
624 let (meta_2, value_2) = create_cached_file_metadata_with_stats("test2.parquet");
625 let (meta_3, value_3) = create_cached_file_metadata_with_stats("test3.parquet");
626
627 let mut ctx = DFHeapSizeCtx::default();
628
629 let limit_for_2_entries = meta_1.location.as_ref().heap_size(&mut ctx)
630 + value_1.heap_size(&mut ctx)
631 + meta_2.location.as_ref().heap_size(&mut ctx)
632 + value_2.heap_size(&mut ctx);
633
634 let cache = DefaultFileStatisticsCache::new(limit_for_2_entries);
636 let path_1 = TableScopedPath {
637 path: meta_1.location.clone(),
638 table: None,
639 };
640
641 let path_2 = TableScopedPath {
642 path: meta_2.location.clone(),
643 table: None,
644 };
645
646 cache.put(&path_1, value_1.clone());
647 cache.put(&path_2, value_2.clone());
648
649 assert_eq!(cache.len(), 2);
650 assert_eq!(cache.memory_used(), limit_for_2_entries);
651
652 let result_1 = cache.get(&path_1);
653 let result_2 = cache.get(&path_2);
654 assert_eq!(result_1.unwrap(), value_1);
655 assert_eq!(result_2.unwrap(), value_2);
656
657 let path_3 = TableScopedPath {
658 path: meta_3.location.clone(),
659 table: None,
660 };
661
662 cache.put(&path_3, value_3.clone());
664 assert_eq!(cache.len(), 2);
665 assert_eq!(cache.memory_used(), limit_for_2_entries);
666
667 let result_1 = cache.get(&path_1);
668 assert!(result_1.is_none());
669
670 let result_2 = cache.get(&path_2);
671 let result_3 = cache.get(&path_3);
672
673 assert_eq!(result_2.unwrap(), value_2);
674 assert_eq!(result_3.unwrap(), value_3);
675
676 cache.put(&path_3, value_3.clone());
678 assert_eq!(cache.memory_used(), limit_for_2_entries);
679 cache.put(&path_3, value_3.clone());
680 assert_eq!(cache.memory_used(), limit_for_2_entries);
681
682 let mut ctx = DFHeapSizeCtx::default();
683 cache.remove(&path_2);
684 assert_eq!(cache.len(), 1);
685 assert_eq!(
686 cache.memory_used(),
687 meta_3.location.as_ref().heap_size(&mut ctx) + value_3.heap_size(&mut ctx)
688 );
689
690 cache.clear();
691 assert_eq!(cache.len(), 0);
692 assert_eq!(cache.memory_used(), 0);
693 }
694
695 #[test]
696 fn test_cache_rejects_entry_which_is_too_large() {
697 let (meta, value) = create_cached_file_metadata_with_stats("test1.parquet");
698 let mut ctx = DFHeapSizeCtx::default();
699 let limit_less_than_the_entry = value.heap_size(&mut ctx) - 1;
700
701 let cache = DefaultFileStatisticsCache::new(limit_less_than_the_entry);
703
704 let path_1 = TableScopedPath {
705 path: meta.location.clone(),
706 table: None,
707 };
708
709 cache.put(&path_1, value);
710
711 assert_eq!(cache.len(), 0);
712 assert_eq!(cache.memory_used(), 0);
713 }
714
715 fn create_cached_file_metadata_with_stats(
716 file_name: &str,
717 ) -> (ObjectMeta, CachedFileMetadata) {
718 let series: Vec<i32> = (0..=10).collect();
719 let values = Int32Array::from(series);
720 let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 11]));
721 let field = Arc::new(Field::new_list_field(DataType::Int32, false));
722 let list_array = ListArray::new(field, offsets, Arc::new(values), None);
723
724 let column_statistics = ColumnStatistics {
725 null_count: Precision::Exact(1),
726 max_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))),
727 min_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))),
728 sum_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))),
729 distinct_count: Precision::Exact(10),
730 byte_size: Precision::Absent,
731 };
732
733 let stats = Statistics {
734 num_rows: Precision::Exact(100),
735 total_byte_size: Precision::Exact(100),
736 column_statistics: vec![column_statistics.clone()],
737 };
738 let mut ctx = DFHeapSizeCtx::default();
739 let object_meta = create_test_meta(file_name, stats.heap_size(&mut ctx) as u64);
740 let value =
741 CachedFileMetadata::new(object_meta.clone(), Arc::new(stats.clone()), None);
742 (object_meta, value)
743 }
744}