datafusion_execution/cache/
cache_unit.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::cache::CacheAccessor;
22use crate::cache::cache_manager::{FileStatisticsCache, FileStatisticsCacheEntry};
23
24use datafusion_common::Statistics;
25
26use dashmap::DashMap;
27use object_store::ObjectMeta;
28use object_store::path::Path;
29
30pub use crate::cache::DefaultFilesMetadataCache;
31
32/// Default implementation of [`FileStatisticsCache`]
33///
34/// Stores collected statistics for files
35///
36/// Cache is invalided when file size or last modification has changed
37///
38/// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache
39#[derive(Default)]
40pub struct DefaultFileStatisticsCache {
41    statistics: DashMap<Path, (ObjectMeta, Arc<Statistics>)>,
42}
43
44impl FileStatisticsCache for DefaultFileStatisticsCache {
45    fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry> {
46        let mut entries = HashMap::<Path, FileStatisticsCacheEntry>::new();
47
48        for entry in &self.statistics {
49            let path = entry.key();
50            let (object_meta, stats) = entry.value();
51            entries.insert(
52                path.clone(),
53                FileStatisticsCacheEntry {
54                    object_meta: object_meta.clone(),
55                    num_rows: stats.num_rows,
56                    num_columns: stats.column_statistics.len(),
57                    table_size_bytes: stats.total_byte_size,
58                    statistics_size_bytes: 0, // TODO: set to the real size in the future
59                },
60            );
61        }
62
63        entries
64    }
65}
66
67impl CacheAccessor<Path, Arc<Statistics>> for DefaultFileStatisticsCache {
68    type Extra = ObjectMeta;
69
70    /// Get `Statistics` for file location.
71    fn get(&self, k: &Path) -> Option<Arc<Statistics>> {
72        self.statistics
73            .get(k)
74            .map(|s| Some(Arc::clone(&s.value().1)))
75            .unwrap_or(None)
76    }
77
78    /// Get `Statistics` for file location. Returns None if file has changed or not found.
79    fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option<Arc<Statistics>> {
80        self.statistics
81            .get(k)
82            .map(|s| {
83                let (saved_meta, statistics) = s.value();
84                if saved_meta.size != e.size
85                    || saved_meta.last_modified != e.last_modified
86                {
87                    // file has changed
88                    None
89                } else {
90                    Some(Arc::clone(statistics))
91                }
92            })
93            .unwrap_or(None)
94    }
95
96    /// Save collected file statistics
97    fn put(&self, _key: &Path, _value: Arc<Statistics>) -> Option<Arc<Statistics>> {
98        panic!("Put cache in DefaultFileStatisticsCache without Extra not supported.")
99    }
100
101    fn put_with_extra(
102        &self,
103        key: &Path,
104        value: Arc<Statistics>,
105        e: &Self::Extra,
106    ) -> Option<Arc<Statistics>> {
107        self.statistics
108            .insert(key.clone(), (e.clone(), value))
109            .map(|x| x.1)
110    }
111
112    fn remove(&self, k: &Path) -> Option<Arc<Statistics>> {
113        self.statistics.remove(k).map(|x| x.1.1)
114    }
115
116    fn contains_key(&self, k: &Path) -> bool {
117        self.statistics.contains_key(k)
118    }
119
120    fn len(&self) -> usize {
121        self.statistics.len()
122    }
123
124    fn clear(&self) {
125        self.statistics.clear()
126    }
127    fn name(&self) -> String {
128        "DefaultFileStatisticsCache".to_string()
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use crate::cache::CacheAccessor;
136    use crate::cache::cache_manager::{FileStatisticsCache, FileStatisticsCacheEntry};
137    use crate::cache::cache_unit::DefaultFileStatisticsCache;
138    use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
139    use chrono::DateTime;
140    use datafusion_common::Statistics;
141    use datafusion_common::stats::Precision;
142    use object_store::ObjectMeta;
143    use object_store::path::Path;
144
145    #[test]
146    fn test_statistics_cache() {
147        let meta = ObjectMeta {
148            location: Path::from("test"),
149            last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
150                .unwrap()
151                .into(),
152            size: 1024,
153            e_tag: None,
154            version: None,
155        };
156        let cache = DefaultFileStatisticsCache::default();
157        assert!(cache.get_with_extra(&meta.location, &meta).is_none());
158
159        cache.put_with_extra(
160            &meta.location,
161            Statistics::new_unknown(&Schema::new(vec![Field::new(
162                "test_column",
163                DataType::Timestamp(TimeUnit::Second, None),
164                false,
165            )]))
166            .into(),
167            &meta,
168        );
169        assert!(cache.get_with_extra(&meta.location, &meta).is_some());
170
171        // file size changed
172        let mut meta2 = meta.clone();
173        meta2.size = 2048;
174        assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
175
176        // file last_modified changed
177        let mut meta2 = meta.clone();
178        meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00")
179            .unwrap()
180            .into();
181        assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
182
183        // different file
184        let mut meta2 = meta.clone();
185        meta2.location = Path::from("test2");
186        assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
187
188        // test the list_entries method
189        let entries = cache.list_entries();
190        assert_eq!(
191            entries,
192            HashMap::from([(
193                Path::from("test"),
194                FileStatisticsCacheEntry {
195                    object_meta: meta.clone(),
196                    num_rows: Precision::Absent,
197                    num_columns: 1,
198                    table_size_bytes: Precision::Absent,
199                    statistics_size_bytes: 0,
200                }
201            )])
202        );
203    }
204}