use std::collections::HashMap;
use std::sync::Arc;
use crate::cache::CacheAccessor;
use crate::cache::cache_manager::{FileStatisticsCache, FileStatisticsCacheEntry};
use datafusion_common::Statistics;
use dashmap::DashMap;
use object_store::ObjectMeta;
use object_store::path::Path;
pub use crate::cache::DefaultFilesMetadataCache;
#[derive(Default)]
pub struct DefaultFileStatisticsCache {
statistics: DashMap<Path, (ObjectMeta, Arc<Statistics>)>,
}
impl FileStatisticsCache for DefaultFileStatisticsCache {
fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry> {
let mut entries = HashMap::<Path, FileStatisticsCacheEntry>::new();
for entry in &self.statistics {
let path = entry.key();
let (object_meta, stats) = entry.value();
entries.insert(
path.clone(),
FileStatisticsCacheEntry {
object_meta: object_meta.clone(),
num_rows: stats.num_rows,
num_columns: stats.column_statistics.len(),
table_size_bytes: stats.total_byte_size,
statistics_size_bytes: 0, },
);
}
entries
}
}
impl CacheAccessor<Path, Arc<Statistics>> for DefaultFileStatisticsCache {
type Extra = ObjectMeta;
fn get(&self, k: &Path) -> Option<Arc<Statistics>> {
self.statistics
.get(k)
.map(|s| Some(Arc::clone(&s.value().1)))
.unwrap_or(None)
}
fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option<Arc<Statistics>> {
self.statistics
.get(k)
.map(|s| {
let (saved_meta, statistics) = s.value();
if saved_meta.size != e.size
|| saved_meta.last_modified != e.last_modified
{
None
} else {
Some(Arc::clone(statistics))
}
})
.unwrap_or(None)
}
fn put(&self, _key: &Path, _value: Arc<Statistics>) -> Option<Arc<Statistics>> {
panic!("Put cache in DefaultFileStatisticsCache without Extra not supported.")
}
fn put_with_extra(
&self,
key: &Path,
value: Arc<Statistics>,
e: &Self::Extra,
) -> Option<Arc<Statistics>> {
self.statistics
.insert(key.clone(), (e.clone(), value))
.map(|x| x.1)
}
fn remove(&self, k: &Path) -> Option<Arc<Statistics>> {
self.statistics.remove(k).map(|x| x.1.1)
}
fn contains_key(&self, k: &Path) -> bool {
self.statistics.contains_key(k)
}
fn len(&self) -> usize {
self.statistics.len()
}
fn clear(&self) {
self.statistics.clear()
}
fn name(&self) -> String {
"DefaultFileStatisticsCache".to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cache::CacheAccessor;
use crate::cache::cache_manager::{FileStatisticsCache, FileStatisticsCacheEntry};
use crate::cache::cache_unit::DefaultFileStatisticsCache;
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use chrono::DateTime;
use datafusion_common::Statistics;
use datafusion_common::stats::Precision;
use object_store::ObjectMeta;
use object_store::path::Path;
#[test]
fn test_statistics_cache() {
let meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
};
let cache = DefaultFileStatisticsCache::default();
assert!(cache.get_with_extra(&meta.location, &meta).is_none());
cache.put_with_extra(
&meta.location,
Statistics::new_unknown(&Schema::new(vec![Field::new(
"test_column",
DataType::Timestamp(TimeUnit::Second, None),
false,
)]))
.into(),
&meta,
);
assert!(cache.get_with_extra(&meta.location, &meta).is_some());
let mut meta2 = meta.clone();
meta2.size = 2048;
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
let mut meta2 = meta.clone();
meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00")
.unwrap()
.into();
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
let mut meta2 = meta.clone();
meta2.location = Path::from("test2");
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
let entries = cache.list_entries();
assert_eq!(
entries,
HashMap::from([(
Path::from("test"),
FileStatisticsCacheEntry {
object_meta: meta.clone(),
num_rows: Precision::Absent,
num_columns: 1,
table_size_bytes: Precision::Absent,
statistics_size_bytes: 0,
}
)])
);
}
}