use crate::cache::cache_unit::DefaultFilesMetadataCache;
use crate::cache::list_files_cache::ListFilesEntry;
use crate::cache::list_files_cache::TableScopedPath;
use crate::cache::{CacheAccessor, DefaultListFilesCache};
use datafusion_common::TableReference;
use datafusion_common::stats::Precision;
use datafusion_common::{Result, Statistics};
use object_store::ObjectMeta;
use object_store::path::Path;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Duration;
pub use super::list_files_cache::{
DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
};
pub trait FileStatisticsCache:
CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>
{
fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileStatisticsCacheEntry {
pub object_meta: ObjectMeta,
pub num_rows: Precision<usize>,
pub num_columns: usize,
pub table_size_bytes: Precision<usize>,
pub statistics_size_bytes: usize,
}
pub trait ListFilesCache:
CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
{
fn cache_limit(&self) -> usize;
fn cache_ttl(&self) -> Option<Duration>;
fn update_cache_limit(&self, limit: usize);
fn update_cache_ttl(&self, ttl: Option<Duration>);
fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry>;
fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> Result<()>;
}
pub trait FileMetadata: Any + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn memory_size(&self) -> usize;
fn extra_info(&self) -> HashMap<String, String>;
}
pub trait FileMetadataCache:
CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
{
fn cache_limit(&self) -> usize;
fn update_cache_limit(&self, limit: usize);
fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileMetadataCacheEntry {
pub object_meta: ObjectMeta,
pub size_bytes: usize,
pub hits: usize,
pub extra: HashMap<String, String>,
}
impl Debug for dyn FileStatisticsCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
}
impl Debug for dyn ListFilesCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
}
impl Debug for dyn FileMetadataCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
}
#[derive(Debug)]
pub struct CacheManager {
file_statistic_cache: Option<Arc<dyn FileStatisticsCache>>,
list_files_cache: Option<Arc<dyn ListFilesCache>>,
file_metadata_cache: Arc<dyn FileMetadataCache>,
}
impl CacheManager {
pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
let file_statistic_cache =
config.table_files_statistics_cache.as_ref().map(Arc::clone);
let list_files_cache = match &config.list_files_cache {
Some(lfc) if config.list_files_cache_limit > 0 => {
lfc.update_cache_limit(config.list_files_cache_limit);
if let Some(ttl) = config.list_files_cache_ttl {
lfc.update_cache_ttl(Some(ttl));
}
Some(Arc::clone(lfc))
}
None if config.list_files_cache_limit > 0 => {
let lfc: Arc<dyn ListFilesCache> = Arc::new(DefaultListFilesCache::new(
config.list_files_cache_limit,
config.list_files_cache_ttl,
));
Some(lfc)
}
_ => None,
};
let file_metadata_cache = config
.file_metadata_cache
.as_ref()
.map(Arc::clone)
.unwrap_or_else(|| {
Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
});
file_metadata_cache.update_cache_limit(config.metadata_cache_limit);
Ok(Arc::new(CacheManager {
file_statistic_cache,
list_files_cache,
file_metadata_cache,
}))
}
pub fn get_file_statistic_cache(&self) -> Option<Arc<dyn FileStatisticsCache>> {
self.file_statistic_cache.clone()
}
pub fn get_list_files_cache(&self) -> Option<Arc<dyn ListFilesCache>> {
self.list_files_cache.clone()
}
pub fn get_list_files_cache_limit(&self) -> usize {
self.list_files_cache
.as_ref()
.map_or(0, |c| c.cache_limit())
}
pub fn get_list_files_cache_ttl(&self) -> Option<Duration> {
self.list_files_cache.as_ref().and_then(|c| c.cache_ttl())
}
pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
Arc::clone(&self.file_metadata_cache)
}
pub fn get_metadata_cache_limit(&self) -> usize {
self.file_metadata_cache.cache_limit()
}
}
pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024;
#[derive(Clone)]
pub struct CacheManagerConfig {
pub table_files_statistics_cache: Option<Arc<dyn FileStatisticsCache>>,
pub list_files_cache: Option<Arc<dyn ListFilesCache>>,
pub list_files_cache_limit: usize,
pub list_files_cache_ttl: Option<Duration>,
pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
pub metadata_cache_limit: usize,
}
impl Default for CacheManagerConfig {
fn default() -> Self {
Self {
table_files_statistics_cache: Default::default(),
list_files_cache: Default::default(),
list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL,
file_metadata_cache: Default::default(),
metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
}
}
}
impl CacheManagerConfig {
pub fn with_files_statistics_cache(
mut self,
cache: Option<Arc<dyn FileStatisticsCache>>,
) -> Self {
self.table_files_statistics_cache = cache;
self
}
pub fn with_list_files_cache(
mut self,
cache: Option<Arc<dyn ListFilesCache>>,
) -> Self {
self.list_files_cache = cache;
self
}
pub fn with_list_files_cache_limit(mut self, limit: usize) -> Self {
self.list_files_cache_limit = limit;
self
}
pub fn with_list_files_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
self.list_files_cache_ttl = ttl;
self
}
pub fn with_file_metadata_cache(
mut self,
cache: Option<Arc<dyn FileMetadataCache>>,
) -> Self {
self.file_metadata_cache = cache;
self
}
pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
self.metadata_cache_limit = limit;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cache::DefaultListFilesCache;
#[test]
fn test_ttl_preserved_when_not_set_in_config() {
use std::time::Duration;
let list_file_cache =
DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
assert_eq!(
list_file_cache.cache_ttl(),
Some(Duration::from_secs(1)),
"Cache should have TTL = 1 second initially"
);
let config = CacheManagerConfig::default()
.with_list_files_cache(Some(Arc::new(list_file_cache)));
let cache_manager = CacheManager::try_new(&config).unwrap();
let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
assert!(
cache_ttl.is_some(),
"TTL should be preserved when not set in config. Expected Some(Duration::from_secs(1)), got {cache_ttl:?}"
);
assert_eq!(
cache_ttl,
Some(Duration::from_secs(1)),
"TTL should be exactly 1 second"
);
}
#[test]
fn test_ttl_overridden_when_set_in_config() {
use std::time::Duration;
let list_file_cache =
DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
let config = CacheManagerConfig::default()
.with_list_files_cache(Some(Arc::new(list_file_cache)))
.with_list_files_cache_ttl(Some(Duration::from_secs(60)));
let cache_manager = CacheManager::try_new(&config).unwrap();
let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
assert_eq!(
cache_ttl,
Some(Duration::from_secs(60)),
"TTL should be overridden to 60 seconds when set in config"
);
}
}