datafusion_execution/cache/
cache_unit.rs1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::cache::CacheAccessor;
22use crate::cache::cache_manager::{FileStatisticsCache, FileStatisticsCacheEntry};
23
24use datafusion_common::Statistics;
25
26use dashmap::DashMap;
27use object_store::ObjectMeta;
28use object_store::path::Path;
29
30pub use crate::cache::DefaultFilesMetadataCache;
31
32#[derive(Default)]
40pub struct DefaultFileStatisticsCache {
41 statistics: DashMap<Path, (ObjectMeta, Arc<Statistics>)>,
42}
43
44impl FileStatisticsCache for DefaultFileStatisticsCache {
45 fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry> {
46 let mut entries = HashMap::<Path, FileStatisticsCacheEntry>::new();
47
48 for entry in &self.statistics {
49 let path = entry.key();
50 let (object_meta, stats) = entry.value();
51 entries.insert(
52 path.clone(),
53 FileStatisticsCacheEntry {
54 object_meta: object_meta.clone(),
55 num_rows: stats.num_rows,
56 num_columns: stats.column_statistics.len(),
57 table_size_bytes: stats.total_byte_size,
58 statistics_size_bytes: 0, },
60 );
61 }
62
63 entries
64 }
65}
66
67impl CacheAccessor<Path, Arc<Statistics>> for DefaultFileStatisticsCache {
68 type Extra = ObjectMeta;
69
70 fn get(&self, k: &Path) -> Option<Arc<Statistics>> {
72 self.statistics
73 .get(k)
74 .map(|s| Some(Arc::clone(&s.value().1)))
75 .unwrap_or(None)
76 }
77
78 fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option<Arc<Statistics>> {
80 self.statistics
81 .get(k)
82 .map(|s| {
83 let (saved_meta, statistics) = s.value();
84 if saved_meta.size != e.size
85 || saved_meta.last_modified != e.last_modified
86 {
87 None
89 } else {
90 Some(Arc::clone(statistics))
91 }
92 })
93 .unwrap_or(None)
94 }
95
96 fn put(&self, _key: &Path, _value: Arc<Statistics>) -> Option<Arc<Statistics>> {
98 panic!("Put cache in DefaultFileStatisticsCache without Extra not supported.")
99 }
100
101 fn put_with_extra(
102 &self,
103 key: &Path,
104 value: Arc<Statistics>,
105 e: &Self::Extra,
106 ) -> Option<Arc<Statistics>> {
107 self.statistics
108 .insert(key.clone(), (e.clone(), value))
109 .map(|x| x.1)
110 }
111
112 fn remove(&self, k: &Path) -> Option<Arc<Statistics>> {
113 self.statistics.remove(k).map(|x| x.1.1)
114 }
115
116 fn contains_key(&self, k: &Path) -> bool {
117 self.statistics.contains_key(k)
118 }
119
120 fn len(&self) -> usize {
121 self.statistics.len()
122 }
123
124 fn clear(&self) {
125 self.statistics.clear()
126 }
127 fn name(&self) -> String {
128 "DefaultFileStatisticsCache".to_string()
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use crate::cache::CacheAccessor;
136 use crate::cache::cache_manager::{FileStatisticsCache, FileStatisticsCacheEntry};
137 use crate::cache::cache_unit::DefaultFileStatisticsCache;
138 use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
139 use chrono::DateTime;
140 use datafusion_common::Statistics;
141 use datafusion_common::stats::Precision;
142 use object_store::ObjectMeta;
143 use object_store::path::Path;
144
145 #[test]
146 fn test_statistics_cache() {
147 let meta = ObjectMeta {
148 location: Path::from("test"),
149 last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
150 .unwrap()
151 .into(),
152 size: 1024,
153 e_tag: None,
154 version: None,
155 };
156 let cache = DefaultFileStatisticsCache::default();
157 assert!(cache.get_with_extra(&meta.location, &meta).is_none());
158
159 cache.put_with_extra(
160 &meta.location,
161 Statistics::new_unknown(&Schema::new(vec![Field::new(
162 "test_column",
163 DataType::Timestamp(TimeUnit::Second, None),
164 false,
165 )]))
166 .into(),
167 &meta,
168 );
169 assert!(cache.get_with_extra(&meta.location, &meta).is_some());
170
171 let mut meta2 = meta.clone();
173 meta2.size = 2048;
174 assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
175
176 let mut meta2 = meta.clone();
178 meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00")
179 .unwrap()
180 .into();
181 assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
182
183 let mut meta2 = meta.clone();
185 meta2.location = Path::from("test2");
186 assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
187
188 let entries = cache.list_entries();
190 assert_eq!(
191 entries,
192 HashMap::from([(
193 Path::from("test"),
194 FileStatisticsCacheEntry {
195 object_meta: meta.clone(),
196 num_rows: Precision::Absent,
197 num_columns: 1,
198 table_size_bytes: Precision::Absent,
199 statistics_size_bytes: 0,
200 }
201 )])
202 );
203 }
204}