use crate::cache::cache_unit::DefaultFilesMetadataCache;
use crate::cache::CacheAccessor;
use datafusion_common::{Result, Statistics};
use object_store::path::Path;
use object_store::ObjectMeta;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
pub type FileStatisticsCache =
Arc<dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>>;
pub type ListFilesCache =
Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
pub trait FileMetadata: Any + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn memory_size(&self) -> usize;
fn extra_info(&self) -> HashMap<String, String>;
}
pub trait FileMetadataCache:
CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
{
fn cache_limit(&self) -> usize;
fn update_cache_limit(&self, limit: usize);
fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileMetadataCacheEntry {
pub object_meta: ObjectMeta,
pub size_bytes: usize,
pub hits: usize,
pub extra: HashMap<String, String>,
}
impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
}
impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
}
impl Debug for dyn FileMetadataCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
}
#[derive(Debug)]
pub struct CacheManager {
file_statistic_cache: Option<FileStatisticsCache>,
list_files_cache: Option<ListFilesCache>,
file_metadata_cache: Arc<dyn FileMetadataCache>,
}
impl CacheManager {
pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
let file_statistic_cache =
config.table_files_statistics_cache.as_ref().map(Arc::clone);
let list_files_cache = config.list_files_cache.as_ref().map(Arc::clone);
let file_metadata_cache = config
.file_metadata_cache
.as_ref()
.map(Arc::clone)
.unwrap_or_else(|| {
Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
});
file_metadata_cache.update_cache_limit(config.metadata_cache_limit);
Ok(Arc::new(CacheManager {
file_statistic_cache,
list_files_cache,
file_metadata_cache,
}))
}
pub fn get_file_statistic_cache(&self) -> Option<FileStatisticsCache> {
self.file_statistic_cache.clone()
}
pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
self.list_files_cache.clone()
}
pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
Arc::clone(&self.file_metadata_cache)
}
pub fn get_metadata_cache_limit(&self) -> usize {
self.file_metadata_cache.cache_limit()
}
}
const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024;
#[derive(Clone)]
pub struct CacheManagerConfig {
pub table_files_statistics_cache: Option<FileStatisticsCache>,
pub list_files_cache: Option<ListFilesCache>,
pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
pub metadata_cache_limit: usize,
}
impl Default for CacheManagerConfig {
fn default() -> Self {
Self {
table_files_statistics_cache: Default::default(),
list_files_cache: Default::default(),
file_metadata_cache: Default::default(),
metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
}
}
}
impl CacheManagerConfig {
pub fn with_files_statistics_cache(
mut self,
cache: Option<FileStatisticsCache>,
) -> Self {
self.table_files_statistics_cache = cache;
self
}
pub fn with_list_files_cache(mut self, cache: Option<ListFilesCache>) -> Self {
self.list_files_cache = cache;
self
}
pub fn with_file_metadata_cache(
mut self,
cache: Option<Arc<dyn FileMetadataCache>>,
) -> Self {
self.file_metadata_cache = cache;
self
}
pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
self.metadata_cache_limit = limit;
self
}
}