use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use crate::cache::cache_manager::{
FileMetadata, FileMetadataCache, FileMetadataCacheEntry,
};
use crate::cache::lru_queue::LruQueue;
use crate::cache::CacheAccessor;
use datafusion_common::Statistics;
use dashmap::DashMap;
use object_store::path::Path;
use object_store::ObjectMeta;
#[derive(Default)]
pub struct DefaultFileStatisticsCache {
statistics: DashMap<Path, (ObjectMeta, Arc<Statistics>)>,
}
impl CacheAccessor<Path, Arc<Statistics>> for DefaultFileStatisticsCache {
type Extra = ObjectMeta;
fn get(&self, k: &Path) -> Option<Arc<Statistics>> {
self.statistics
.get(k)
.map(|s| Some(Arc::clone(&s.value().1)))
.unwrap_or(None)
}
fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option<Arc<Statistics>> {
self.statistics
.get(k)
.map(|s| {
let (saved_meta, statistics) = s.value();
if saved_meta.size != e.size
|| saved_meta.last_modified != e.last_modified
{
None
} else {
Some(Arc::clone(statistics))
}
})
.unwrap_or(None)
}
fn put(&self, _key: &Path, _value: Arc<Statistics>) -> Option<Arc<Statistics>> {
panic!("Put cache in DefaultFileStatisticsCache without Extra not supported.")
}
fn put_with_extra(
&self,
key: &Path,
value: Arc<Statistics>,
e: &Self::Extra,
) -> Option<Arc<Statistics>> {
self.statistics
.insert(key.clone(), (e.clone(), value))
.map(|x| x.1)
}
fn remove(&mut self, k: &Path) -> Option<Arc<Statistics>> {
self.statistics.remove(k).map(|x| x.1 .1)
}
fn contains_key(&self, k: &Path) -> bool {
self.statistics.contains_key(k)
}
fn len(&self) -> usize {
self.statistics.len()
}
fn clear(&self) {
self.statistics.clear()
}
fn name(&self) -> String {
"DefaultFileStatisticsCache".to_string()
}
}
#[derive(Default)]
pub struct DefaultListFilesCache {
statistics: DashMap<Path, Arc<Vec<ObjectMeta>>>,
}
impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
type Extra = ObjectMeta;
fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
self.statistics.get(k).map(|x| Arc::clone(x.value()))
}
fn get_with_extra(
&self,
_k: &Path,
_e: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
panic!("Not supported DefaultListFilesCache get_with_extra")
}
fn put(
&self,
key: &Path,
value: Arc<Vec<ObjectMeta>>,
) -> Option<Arc<Vec<ObjectMeta>>> {
self.statistics.insert(key.clone(), value)
}
fn put_with_extra(
&self,
_key: &Path,
_value: Arc<Vec<ObjectMeta>>,
_e: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
panic!("Not supported DefaultListFilesCache put_with_extra")
}
fn remove(&mut self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
self.statistics.remove(k).map(|x| x.1)
}
fn contains_key(&self, k: &Path) -> bool {
self.statistics.contains_key(k)
}
fn len(&self) -> usize {
self.statistics.len()
}
fn clear(&self) {
self.statistics.clear()
}
fn name(&self) -> String {
"DefaultListFilesCache".to_string()
}
}
struct DefaultFilesMetadataCacheState {
lru_queue: LruQueue<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
memory_limit: usize,
memory_used: usize,
cache_hits: HashMap<Path, usize>,
}
impl DefaultFilesMetadataCacheState {
fn new(memory_limit: usize) -> Self {
Self {
lru_queue: LruQueue::new(),
memory_limit,
memory_used: 0,
cache_hits: HashMap::new(),
}
}
fn get(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
self.lru_queue
.get(&k.location)
.map(|(object_meta, metadata)| {
if object_meta.size != k.size
|| object_meta.last_modified != k.last_modified
{
None
} else {
*self.cache_hits.entry(k.location.clone()).or_insert(0) += 1;
Some(Arc::clone(metadata))
}
})
.unwrap_or(None)
}
fn contains_key(&self, k: &ObjectMeta) -> bool {
self.lru_queue
.peek(&k.location)
.map(|(object_meta, _)| {
object_meta.size == k.size && object_meta.last_modified == k.last_modified
})
.unwrap_or(false)
}
fn put(
&mut self,
key: ObjectMeta,
value: Arc<dyn FileMetadata>,
) -> Option<Arc<dyn FileMetadata>> {
let value_size = value.memory_size();
if value_size > self.memory_limit {
return None;
}
self.cache_hits.insert(key.location.clone(), 0);
let old_value = self.lru_queue.put(key.location.clone(), (key, value));
self.memory_used += value_size;
if let Some((_, ref old_metadata)) = old_value {
self.memory_used -= old_metadata.memory_size();
}
self.evict_entries();
old_value.map(|v| v.1)
}
fn evict_entries(&mut self) {
while self.memory_used > self.memory_limit {
if let Some(removed) = self.lru_queue.pop() {
let metadata: Arc<dyn FileMetadata> = removed.1 .1;
self.memory_used -= metadata.memory_size();
} else {
debug_assert!(
false,
"cache is empty while memory_used > memory_limit, cannot happen"
);
return;
}
}
}
fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
if let Some((_, old_metadata)) = self.lru_queue.remove(&k.location) {
self.memory_used -= old_metadata.memory_size();
self.cache_hits.remove(&k.location);
Some(old_metadata)
} else {
None
}
}
fn len(&self) -> usize {
self.lru_queue.len()
}
fn clear(&mut self) {
self.lru_queue.clear();
self.memory_used = 0;
self.cache_hits.clear();
}
}
pub struct DefaultFilesMetadataCache {
state: Mutex<DefaultFilesMetadataCacheState>,
}
impl DefaultFilesMetadataCache {
pub fn new(memory_limit: usize) -> Self {
Self {
state: Mutex::new(DefaultFilesMetadataCacheState::new(memory_limit)),
}
}
pub fn memory_used(&self) -> usize {
let state = self.state.lock().unwrap();
state.memory_used
}
}
impl FileMetadataCache for DefaultFilesMetadataCache {
fn cache_limit(&self) -> usize {
let state = self.state.lock().unwrap();
state.memory_limit
}
fn update_cache_limit(&self, limit: usize) {
let mut state = self.state.lock().unwrap();
state.memory_limit = limit;
state.evict_entries();
}
fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry> {
let state = self.state.lock().unwrap();
let mut entries = HashMap::<Path, FileMetadataCacheEntry>::new();
for (path, (object_meta, metadata)) in state.lru_queue.list_entries() {
entries.insert(
path.clone(),
FileMetadataCacheEntry {
object_meta: object_meta.clone(),
size_bytes: metadata.memory_size(),
hits: *state.cache_hits.get(path).expect("entry must exist"),
extra: metadata.extra_info(),
},
);
}
entries
}
}
impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for DefaultFilesMetadataCache {
type Extra = ObjectMeta;
fn get(&self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
let mut state = self.state.lock().unwrap();
state.get(k)
}
fn get_with_extra(
&self,
k: &ObjectMeta,
_e: &Self::Extra,
) -> Option<Arc<dyn FileMetadata>> {
self.get(k)
}
fn put(
&self,
key: &ObjectMeta,
value: Arc<dyn FileMetadata>,
) -> Option<Arc<dyn FileMetadata>> {
let mut state = self.state.lock().unwrap();
state.put(key.clone(), value)
}
fn put_with_extra(
&self,
key: &ObjectMeta,
value: Arc<dyn FileMetadata>,
_e: &Self::Extra,
) -> Option<Arc<dyn FileMetadata>> {
self.put(key, value)
}
fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
let mut state = self.state.lock().unwrap();
state.remove(k)
}
fn contains_key(&self, k: &ObjectMeta) -> bool {
let state = self.state.lock().unwrap();
state.contains_key(k)
}
fn len(&self) -> usize {
let state = self.state.lock().unwrap();
state.len()
}
fn clear(&self) {
let mut state = self.state.lock().unwrap();
state.clear();
}
fn name(&self) -> String {
"DefaultFilesMetadataCache".to_string()
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use crate::cache::cache_manager::{
FileMetadata, FileMetadataCache, FileMetadataCacheEntry,
};
use crate::cache::cache_unit::{
DefaultFileStatisticsCache, DefaultFilesMetadataCache, DefaultListFilesCache,
};
use crate::cache::CacheAccessor;
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use chrono::DateTime;
use datafusion_common::Statistics;
use object_store::path::Path;
use object_store::ObjectMeta;
#[test]
fn test_statistics_cache() {
let meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
};
let cache = DefaultFileStatisticsCache::default();
assert!(cache.get_with_extra(&meta.location, &meta).is_none());
cache.put_with_extra(
&meta.location,
Statistics::new_unknown(&Schema::new(vec![Field::new(
"test_column",
DataType::Timestamp(TimeUnit::Second, None),
false,
)]))
.into(),
&meta,
);
assert!(cache.get_with_extra(&meta.location, &meta).is_some());
let mut meta2 = meta.clone();
meta2.size = 2048;
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
let mut meta2 = meta.clone();
meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00")
.unwrap()
.into();
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
let mut meta2 = meta;
meta2.location = Path::from("test2");
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
}
#[test]
fn test_list_file_cache() {
let meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
};
let cache = DefaultListFilesCache::default();
assert!(cache.get(&meta.location).is_none());
cache.put(&meta.location, vec![meta.clone()].into());
assert_eq!(
cache.get(&meta.location).unwrap().first().unwrap().clone(),
meta.clone()
);
}
pub struct TestFileMetadata {
metadata: String,
}
impl FileMetadata for TestFileMetadata {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn memory_size(&self) -> usize {
self.metadata.len()
}
fn extra_info(&self) -> HashMap<String, String> {
HashMap::from([("extra_info".to_owned(), "abc".to_owned())])
}
}
#[test]
fn test_default_file_metadata_cache() {
let object_meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2025-07-29T12:12:12+00:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
};
let metadata: Arc<dyn FileMetadata> = Arc::new(TestFileMetadata {
metadata: "retrieved_metadata".to_owned(),
});
let mut cache = DefaultFilesMetadataCache::new(1024 * 1024);
assert!(cache.get(&object_meta).is_none());
cache.put(&object_meta, Arc::clone(&metadata));
assert!(cache.contains_key(&object_meta));
let value = cache.get(&object_meta);
assert!(value.is_some());
let test_file_metadata = Arc::downcast::<TestFileMetadata>(value.unwrap());
assert!(test_file_metadata.is_ok());
assert_eq!(test_file_metadata.unwrap().metadata, "retrieved_metadata");
let mut object_meta2 = object_meta.clone();
object_meta2.size = 2048;
assert!(cache.get(&object_meta2).is_none());
assert!(!cache.contains_key(&object_meta2));
let mut object_meta2 = object_meta.clone();
object_meta2.last_modified =
DateTime::parse_from_rfc3339("2025-07-29T13:13:13+00:00")
.unwrap()
.into();
assert!(cache.get(&object_meta2).is_none());
assert!(!cache.contains_key(&object_meta2));
let mut object_meta2 = object_meta.clone();
object_meta2.location = Path::from("test2");
assert!(cache.get(&object_meta2).is_none());
assert!(!cache.contains_key(&object_meta2));
cache.remove(&object_meta);
assert!(cache.get(&object_meta).is_none());
assert!(!cache.contains_key(&object_meta));
cache.put(&object_meta, Arc::clone(&metadata));
cache.put(&object_meta2, metadata);
assert_eq!(cache.len(), 2);
cache.clear();
assert_eq!(cache.len(), 0);
}
fn generate_test_metadata_with_size(
path: &str,
size: usize,
) -> (ObjectMeta, Arc<dyn FileMetadata>) {
let object_meta = ObjectMeta {
location: Path::from(path),
last_modified: chrono::Utc::now(),
size: size as u64,
e_tag: None,
version: None,
};
let metadata: Arc<dyn FileMetadata> = Arc::new(TestFileMetadata {
metadata: "a".repeat(size),
});
(object_meta, metadata)
}
#[test]
fn test_default_file_metadata_cache_with_limit() {
let mut cache = DefaultFilesMetadataCache::new(1000);
let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100);
let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 500);
let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300);
cache.put(&object_meta1, metadata1);
cache.put(&object_meta2, metadata2);
cache.put(&object_meta3, metadata3);
assert_eq!(cache.len(), 3);
assert_eq!(cache.memory_used(), 900);
assert!(cache.contains_key(&object_meta1));
assert!(cache.contains_key(&object_meta2));
assert!(cache.contains_key(&object_meta3));
let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 200);
cache.put(&object_meta4, metadata4);
assert_eq!(cache.len(), 3);
assert_eq!(cache.memory_used(), 1000);
assert!(!cache.contains_key(&object_meta1));
assert!(cache.contains_key(&object_meta4));
cache.get(&object_meta2);
let (object_meta5, metadata5) = generate_test_metadata_with_size("5", 100);
cache.put(&object_meta5, metadata5);
assert_eq!(cache.len(), 3);
assert_eq!(cache.memory_used(), 800);
assert!(!cache.contains_key(&object_meta3));
assert!(cache.contains_key(&object_meta5));
let (object_meta6, metadata6) = generate_test_metadata_with_size("6", 1200);
cache.put(&object_meta6, metadata6);
assert_eq!(cache.len(), 3);
assert_eq!(cache.memory_used(), 800);
assert!(!cache.contains_key(&object_meta6));
let (object_meta7, metadata7) = generate_test_metadata_with_size("7", 200);
cache.put(&object_meta7, metadata7);
assert_eq!(cache.len(), 4);
assert_eq!(cache.memory_used(), 1000);
assert!(cache.contains_key(&object_meta7));
let (object_meta8, metadata8) = generate_test_metadata_with_size("8", 999);
cache.put(&object_meta8, metadata8);
assert_eq!(cache.len(), 1);
assert_eq!(cache.memory_used(), 999);
assert!(cache.contains_key(&object_meta8));
let (object_meta9, metadata9) = generate_test_metadata_with_size("9", 300);
let (object_meta10, metadata10) = generate_test_metadata_with_size("10", 200);
let (object_meta11_v1, metadata11_v1) =
generate_test_metadata_with_size("11", 400);
cache.put(&object_meta9, metadata9);
cache.put(&object_meta10, metadata10);
cache.put(&object_meta11_v1, metadata11_v1);
assert_eq!(cache.memory_used(), 900);
assert_eq!(cache.len(), 3);
let (object_meta11_v2, metadata11_v2) =
generate_test_metadata_with_size("11", 500);
cache.put(&object_meta11_v2, metadata11_v2);
assert_eq!(cache.memory_used(), 1000);
assert_eq!(cache.len(), 3);
assert!(cache.contains_key(&object_meta9));
assert!(cache.contains_key(&object_meta10));
assert!(cache.contains_key(&object_meta11_v2));
assert!(!cache.contains_key(&object_meta11_v1));
let (object_meta11_v3, metadata11_v3) =
generate_test_metadata_with_size("11", 501);
cache.put(&object_meta11_v3, metadata11_v3);
assert_eq!(cache.memory_used(), 701);
assert_eq!(cache.len(), 2);
assert!(cache.contains_key(&object_meta10));
assert!(cache.contains_key(&object_meta11_v3));
assert!(!cache.contains_key(&object_meta11_v2));
cache.remove(&object_meta11_v3);
assert_eq!(cache.len(), 1);
assert_eq!(cache.memory_used(), 200);
assert!(cache.contains_key(&object_meta10));
assert!(!cache.contains_key(&object_meta11_v3));
cache.clear();
assert_eq!(cache.len(), 0);
assert_eq!(cache.memory_used(), 0);
let (object_meta12, metadata12) = generate_test_metadata_with_size("12", 300);
let (object_meta13, metadata13) = generate_test_metadata_with_size("13", 200);
let (object_meta14, metadata14) = generate_test_metadata_with_size("14", 500);
cache.put(&object_meta12, metadata12);
cache.put(&object_meta13, metadata13);
cache.put(&object_meta14, metadata14);
assert_eq!(cache.len(), 3);
assert_eq!(cache.memory_used(), 1000);
cache.update_cache_limit(600);
assert_eq!(cache.len(), 1);
assert_eq!(cache.memory_used(), 500);
assert!(!cache.contains_key(&object_meta12));
assert!(!cache.contains_key(&object_meta13));
assert!(cache.contains_key(&object_meta14));
}
#[test]
fn test_default_file_metadata_cache_entries_info() {
let mut cache = DefaultFilesMetadataCache::new(1000);
let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100);
let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 200);
let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300);
cache.put(&object_meta1, metadata1);
cache.put(&object_meta2, metadata2);
cache.put(&object_meta3, metadata3);
assert_eq!(
cache.list_entries(),
HashMap::from([
(
Path::from("1"),
FileMetadataCacheEntry {
object_meta: object_meta1.clone(),
size_bytes: 100,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("2"),
FileMetadataCacheEntry {
object_meta: object_meta2.clone(),
size_bytes: 200,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("3"),
FileMetadataCacheEntry {
object_meta: object_meta3.clone(),
size_bytes: 300,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
)
])
);
cache.get(&object_meta1);
assert_eq!(
cache.list_entries(),
HashMap::from([
(
Path::from("1"),
FileMetadataCacheEntry {
object_meta: object_meta1.clone(),
size_bytes: 100,
hits: 1,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("2"),
FileMetadataCacheEntry {
object_meta: object_meta2.clone(),
size_bytes: 200,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("3"),
FileMetadataCacheEntry {
object_meta: object_meta3.clone(),
size_bytes: 300,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
)
])
);
let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 600);
cache.put(&object_meta4, metadata4);
assert_eq!(
cache.list_entries(),
HashMap::from([
(
Path::from("1"),
FileMetadataCacheEntry {
object_meta: object_meta1.clone(),
size_bytes: 100,
hits: 1,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("3"),
FileMetadataCacheEntry {
object_meta: object_meta3.clone(),
size_bytes: 300,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("4"),
FileMetadataCacheEntry {
object_meta: object_meta4.clone(),
size_bytes: 600,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
)
])
);
let (object_meta1_new, metadata1_new) = generate_test_metadata_with_size("1", 50);
cache.put(&object_meta1_new, metadata1_new);
assert_eq!(
cache.list_entries(),
HashMap::from([
(
Path::from("1"),
FileMetadataCacheEntry {
object_meta: object_meta1_new.clone(),
size_bytes: 50,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("3"),
FileMetadataCacheEntry {
object_meta: object_meta3.clone(),
size_bytes: 300,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("4"),
FileMetadataCacheEntry {
object_meta: object_meta4.clone(),
size_bytes: 600,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
)
])
);
cache.remove(&object_meta4);
assert_eq!(
cache.list_entries(),
HashMap::from([
(
Path::from("1"),
FileMetadataCacheEntry {
object_meta: object_meta1_new.clone(),
size_bytes: 50,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("3"),
FileMetadataCacheEntry {
object_meta: object_meta3.clone(),
size_bytes: 300,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
)
])
);
cache.clear();
assert_eq!(cache.list_entries(), HashMap::from([]));
}
}