use crate::cache::CacheAccessor;
use crate::cache::DefaultListFilesCache;
use crate::cache::cache_unit::DefaultFilesMetadataCache;
use crate::cache::list_files_cache::ListFilesEntry;
use crate::cache::list_files_cache::TableScopedPath;
use datafusion_common::TableReference;
use datafusion_common::stats::Precision;
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use object_store::ObjectMeta;
use object_store::path::Path;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
pub use super::list_files_cache::{
DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
};
#[derive(Debug, Clone)]
pub struct CachedFileMetadata {
pub meta: ObjectMeta,
pub statistics: Arc<Statistics>,
pub ordering: Option<LexOrdering>,
}
impl CachedFileMetadata {
pub fn new(
meta: ObjectMeta,
statistics: Arc<Statistics>,
ordering: Option<LexOrdering>,
) -> Self {
Self {
meta,
statistics,
ordering,
}
}
pub fn is_valid_for(&self, current_meta: &ObjectMeta) -> bool {
self.meta.size == current_meta.size
&& self.meta.last_modified == current_meta.last_modified
}
}
pub trait FileStatisticsCache: CacheAccessor<Path, CachedFileMetadata> {
fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileStatisticsCacheEntry {
pub object_meta: ObjectMeta,
pub num_rows: Precision<usize>,
pub num_columns: usize,
pub table_size_bytes: Precision<usize>,
pub statistics_size_bytes: usize,
pub has_ordering: bool,
}
#[derive(Debug, Clone, PartialEq)]
pub struct CachedFileList {
pub files: Arc<Vec<ObjectMeta>>,
}
impl CachedFileList {
pub fn new(files: Vec<ObjectMeta>) -> Self {
Self {
files: Arc::new(files),
}
}
fn filter_by_prefix(&self, prefix: &Option<Path>) -> Vec<ObjectMeta> {
match prefix {
Some(prefix) => self
.files
.iter()
.filter(|meta| meta.location.as_ref().starts_with(prefix.as_ref()))
.cloned()
.collect(),
None => self.files.as_ref().clone(),
}
}
pub fn files_matching_prefix(&self, prefix: &Option<Path>) -> Arc<Vec<ObjectMeta>> {
match prefix {
None => Arc::clone(&self.files),
Some(p) => Arc::new(self.filter_by_prefix(&Some(p.clone()))),
}
}
}
impl Deref for CachedFileList {
type Target = Arc<Vec<ObjectMeta>>;
fn deref(&self) -> &Self::Target {
&self.files
}
}
impl From<Vec<ObjectMeta>> for CachedFileList {
fn from(files: Vec<ObjectMeta>) -> Self {
Self::new(files)
}
}
pub trait ListFilesCache: CacheAccessor<TableScopedPath, CachedFileList> {
fn cache_limit(&self) -> usize;
fn cache_ttl(&self) -> Option<Duration>;
fn update_cache_limit(&self, limit: usize);
fn update_cache_ttl(&self, ttl: Option<Duration>);
fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry>;
fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> Result<()>;
}
pub trait FileMetadata: Any + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn memory_size(&self) -> usize;
fn extra_info(&self) -> HashMap<String, String>;
}
#[derive(Clone)]
pub struct CachedFileMetadataEntry {
pub meta: ObjectMeta,
pub file_metadata: Arc<dyn FileMetadata>,
}
impl CachedFileMetadataEntry {
pub fn new(meta: ObjectMeta, file_metadata: Arc<dyn FileMetadata>) -> Self {
Self {
meta,
file_metadata,
}
}
pub fn is_valid_for(&self, current_meta: &ObjectMeta) -> bool {
self.meta.size == current_meta.size
&& self.meta.last_modified == current_meta.last_modified
}
}
impl Debug for CachedFileMetadataEntry {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CachedFileMetadataEntry")
.field("meta", &self.meta)
.field("memory_size", &self.file_metadata.memory_size())
.finish()
}
}
pub trait FileMetadataCache: CacheAccessor<Path, CachedFileMetadataEntry> {
fn cache_limit(&self) -> usize;
fn update_cache_limit(&self, limit: usize);
fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileMetadataCacheEntry {
pub object_meta: ObjectMeta,
pub size_bytes: usize,
pub hits: usize,
pub extra: HashMap<String, String>,
}
impl Debug for dyn FileStatisticsCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
}
impl Debug for dyn ListFilesCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
}
impl Debug for dyn FileMetadataCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
}
#[derive(Debug)]
pub struct CacheManager {
file_statistic_cache: Option<Arc<dyn FileStatisticsCache>>,
list_files_cache: Option<Arc<dyn ListFilesCache>>,
file_metadata_cache: Arc<dyn FileMetadataCache>,
}
impl CacheManager {
pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
let file_statistic_cache =
config.table_files_statistics_cache.as_ref().map(Arc::clone);
let list_files_cache = match &config.list_files_cache {
Some(lfc) if config.list_files_cache_limit > 0 => {
lfc.update_cache_limit(config.list_files_cache_limit);
if let Some(ttl) = config.list_files_cache_ttl {
lfc.update_cache_ttl(Some(ttl));
}
Some(Arc::clone(lfc))
}
None if config.list_files_cache_limit > 0 => {
let lfc: Arc<dyn ListFilesCache> = Arc::new(DefaultListFilesCache::new(
config.list_files_cache_limit,
config.list_files_cache_ttl,
));
Some(lfc)
}
_ => None,
};
let file_metadata_cache = config
.file_metadata_cache
.as_ref()
.map(Arc::clone)
.unwrap_or_else(|| {
Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
});
file_metadata_cache.update_cache_limit(config.metadata_cache_limit);
Ok(Arc::new(CacheManager {
file_statistic_cache,
list_files_cache,
file_metadata_cache,
}))
}
pub fn get_file_statistic_cache(&self) -> Option<Arc<dyn FileStatisticsCache>> {
self.file_statistic_cache.clone()
}
pub fn get_list_files_cache(&self) -> Option<Arc<dyn ListFilesCache>> {
self.list_files_cache.clone()
}
pub fn get_list_files_cache_limit(&self) -> usize {
self.list_files_cache
.as_ref()
.map_or(0, |c| c.cache_limit())
}
pub fn get_list_files_cache_ttl(&self) -> Option<Duration> {
self.list_files_cache.as_ref().and_then(|c| c.cache_ttl())
}
pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
Arc::clone(&self.file_metadata_cache)
}
pub fn get_metadata_cache_limit(&self) -> usize {
self.file_metadata_cache.cache_limit()
}
}
pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024;
#[derive(Clone)]
pub struct CacheManagerConfig {
pub table_files_statistics_cache: Option<Arc<dyn FileStatisticsCache>>,
pub list_files_cache: Option<Arc<dyn ListFilesCache>>,
pub list_files_cache_limit: usize,
pub list_files_cache_ttl: Option<Duration>,
pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
pub metadata_cache_limit: usize,
}
impl Default for CacheManagerConfig {
fn default() -> Self {
Self {
table_files_statistics_cache: Default::default(),
list_files_cache: Default::default(),
list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL,
file_metadata_cache: Default::default(),
metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
}
}
}
impl CacheManagerConfig {
pub fn with_files_statistics_cache(
mut self,
cache: Option<Arc<dyn FileStatisticsCache>>,
) -> Self {
self.table_files_statistics_cache = cache;
self
}
pub fn with_list_files_cache(
mut self,
cache: Option<Arc<dyn ListFilesCache>>,
) -> Self {
self.list_files_cache = cache;
self
}
pub fn with_list_files_cache_limit(mut self, limit: usize) -> Self {
self.list_files_cache_limit = limit;
self
}
pub fn with_list_files_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
self.list_files_cache_ttl = ttl;
self
}
pub fn with_file_metadata_cache(
mut self,
cache: Option<Arc<dyn FileMetadataCache>>,
) -> Self {
self.file_metadata_cache = cache;
self
}
pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
self.metadata_cache_limit = limit;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cache::DefaultListFilesCache;
#[test]
fn test_ttl_preserved_when_not_set_in_config() {
use std::time::Duration;
let list_file_cache =
DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
assert_eq!(
list_file_cache.cache_ttl(),
Some(Duration::from_secs(1)),
"Cache should have TTL = 1 second initially"
);
let config = CacheManagerConfig::default()
.with_list_files_cache(Some(Arc::new(list_file_cache)));
let cache_manager = CacheManager::try_new(&config).unwrap();
let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
assert!(
cache_ttl.is_some(),
"TTL should be preserved when not set in config. Expected Some(Duration::from_secs(1)), got {cache_ttl:?}"
);
assert_eq!(
cache_ttl,
Some(Duration::from_secs(1)),
"TTL should be exactly 1 second"
);
}
#[test]
fn test_ttl_overridden_when_set_in_config() {
use std::time::Duration;
let list_file_cache =
DefaultListFilesCache::new(1024, Some(Duration::from_secs(1)));
let config = CacheManagerConfig::default()
.with_list_files_cache(Some(Arc::new(list_file_cache)))
.with_list_files_cache_ttl(Some(Duration::from_secs(60)));
let cache_manager = CacheManager::try_new(&config).unwrap();
let cache_ttl = cache_manager.get_list_files_cache().unwrap().cache_ttl();
assert_eq!(
cache_ttl,
Some(Duration::from_secs(60)),
"TTL should be overridden to 60 seconds when set in config"
);
}
}