use std::{collections::HashMap, sync::Mutex};
use object_store::path::Path;
use crate::cache::{
CacheAccessor,
cache_manager::{CachedFileMetadataEntry, FileMetadataCache, FileMetadataCacheEntry},
lru_queue::LruQueue,
};
struct DefaultFilesMetadataCacheState {
lru_queue: LruQueue<Path, CachedFileMetadataEntry>,
memory_limit: usize,
memory_used: usize,
cache_hits: HashMap<Path, usize>,
}
impl DefaultFilesMetadataCacheState {
fn new(memory_limit: usize) -> Self {
Self {
lru_queue: LruQueue::new(),
memory_limit,
memory_used: 0,
cache_hits: HashMap::new(),
}
}
fn get(&mut self, k: &Path) -> Option<CachedFileMetadataEntry> {
self.lru_queue.get(k).cloned().inspect(|_| {
*self.cache_hits.entry(k.clone()).or_insert(0) += 1;
})
}
fn contains_key(&self, k: &Path) -> bool {
self.lru_queue.peek(k).is_some()
}
fn put(
&mut self,
key: Path,
value: CachedFileMetadataEntry,
) -> Option<CachedFileMetadataEntry> {
let value_size = value.file_metadata.memory_size();
if value_size > self.memory_limit {
return None;
}
self.cache_hits.insert(key.clone(), 0);
let old_value = self.lru_queue.put(key, value);
self.memory_used += value_size;
if let Some(ref old_entry) = old_value {
self.memory_used -= old_entry.file_metadata.memory_size();
}
self.evict_entries();
old_value
}
fn evict_entries(&mut self) {
while self.memory_used > self.memory_limit {
if let Some(removed) = self.lru_queue.pop() {
self.memory_used -= removed.1.file_metadata.memory_size();
} else {
debug_assert!(
false,
"cache is empty while memory_used > memory_limit, cannot happen"
);
return;
}
}
}
fn remove(&mut self, k: &Path) -> Option<CachedFileMetadataEntry> {
if let Some(old_entry) = self.lru_queue.remove(k) {
self.memory_used -= old_entry.file_metadata.memory_size();
self.cache_hits.remove(k);
Some(old_entry)
} else {
None
}
}
fn len(&self) -> usize {
self.lru_queue.len()
}
fn clear(&mut self) {
self.lru_queue.clear();
self.memory_used = 0;
self.cache_hits.clear();
}
}
pub struct DefaultFilesMetadataCache {
state: Mutex<DefaultFilesMetadataCacheState>,
}
impl DefaultFilesMetadataCache {
pub fn new(memory_limit: usize) -> Self {
Self {
state: Mutex::new(DefaultFilesMetadataCacheState::new(memory_limit)),
}
}
pub fn memory_used(&self) -> usize {
let state = self.state.lock().unwrap();
state.memory_used
}
}
impl CacheAccessor<Path, CachedFileMetadataEntry> for DefaultFilesMetadataCache {
fn get(&self, key: &Path) -> Option<CachedFileMetadataEntry> {
let mut state = self.state.lock().unwrap();
state.get(key)
}
fn put(
&self,
key: &Path,
value: CachedFileMetadataEntry,
) -> Option<CachedFileMetadataEntry> {
let mut state = self.state.lock().unwrap();
state.put(key.clone(), value)
}
fn remove(&self, k: &Path) -> Option<CachedFileMetadataEntry> {
let mut state = self.state.lock().unwrap();
state.remove(k)
}
fn contains_key(&self, k: &Path) -> bool {
let state = self.state.lock().unwrap();
state.contains_key(k)
}
fn len(&self) -> usize {
let state = self.state.lock().unwrap();
state.len()
}
fn clear(&self) {
let mut state = self.state.lock().unwrap();
state.clear();
}
fn name(&self) -> String {
"DefaultFilesMetadataCache".to_string()
}
}
impl FileMetadataCache for DefaultFilesMetadataCache {
fn cache_limit(&self) -> usize {
let state = self.state.lock().unwrap();
state.memory_limit
}
fn update_cache_limit(&self, limit: usize) {
let mut state = self.state.lock().unwrap();
state.memory_limit = limit;
state.evict_entries();
}
fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry> {
let state = self.state.lock().unwrap();
let mut entries = HashMap::<Path, FileMetadataCacheEntry>::new();
for (path, entry) in state.lru_queue.list_entries() {
entries.insert(
path.clone(),
FileMetadataCacheEntry {
object_meta: entry.meta.clone(),
size_bytes: entry.file_metadata.memory_size(),
hits: *state.cache_hits.get(path).expect("entry must exist"),
extra: entry.file_metadata.extra_info(),
},
);
}
entries
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use crate::cache::CacheAccessor;
use crate::cache::cache_manager::{
CachedFileMetadataEntry, FileMetadata, FileMetadataCache, FileMetadataCacheEntry,
};
use crate::cache::file_metadata_cache::DefaultFilesMetadataCache;
use object_store::ObjectMeta;
use object_store::path::Path;
pub struct TestFileMetadata {
metadata: String,
}
impl FileMetadata for TestFileMetadata {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn memory_size(&self) -> usize {
self.metadata.len()
}
fn extra_info(&self) -> HashMap<String, String> {
HashMap::from([("extra_info".to_owned(), "abc".to_owned())])
}
}
fn create_test_object_meta(path: &str, size: usize) -> ObjectMeta {
ObjectMeta {
location: Path::from(path),
last_modified: chrono::DateTime::parse_from_rfc3339(
"2025-07-29T12:12:12+00:00",
)
.unwrap()
.into(),
size: size as u64,
e_tag: None,
version: None,
}
}
#[test]
fn test_default_file_metadata_cache() {
let object_meta = create_test_object_meta("test", 1024);
let metadata: Arc<dyn FileMetadata> = Arc::new(TestFileMetadata {
metadata: "retrieved_metadata".to_owned(),
});
let cache = DefaultFilesMetadataCache::new(1024 * 1024);
assert!(cache.get(&object_meta.location).is_none());
let cached_entry =
CachedFileMetadataEntry::new(object_meta.clone(), Arc::clone(&metadata));
cache.put(&object_meta.location, cached_entry);
assert!(cache.contains_key(&object_meta.location));
let result = cache.get(&object_meta.location).unwrap();
let test_file_metadata = Arc::downcast::<TestFileMetadata>(result.file_metadata);
assert!(test_file_metadata.is_ok());
assert_eq!(test_file_metadata.unwrap().metadata, "retrieved_metadata");
let result2 = cache.get(&object_meta.location).unwrap();
assert!(result2.is_valid_for(&object_meta));
let object_meta2 = create_test_object_meta("test", 2048);
let result3 = cache.get(&object_meta2.location).unwrap();
assert!(!result3.is_valid_for(&object_meta2));
let new_entry =
CachedFileMetadataEntry::new(object_meta2.clone(), Arc::clone(&metadata));
cache.put(&object_meta2.location, new_entry);
let result4 = cache.get(&object_meta2.location).unwrap();
assert_eq!(result4.meta.size, 2048);
cache.remove(&object_meta.location);
assert!(!cache.contains_key(&object_meta.location));
let object_meta3 = create_test_object_meta("test3", 100);
cache.put(
&object_meta.location,
CachedFileMetadataEntry::new(object_meta.clone(), Arc::clone(&metadata)),
);
cache.put(
&object_meta3.location,
CachedFileMetadataEntry::new(object_meta3.clone(), Arc::clone(&metadata)),
);
assert_eq!(cache.len(), 2);
cache.clear();
assert_eq!(cache.len(), 0);
}
fn generate_test_metadata_with_size(
path: &str,
size: usize,
) -> (ObjectMeta, Arc<dyn FileMetadata>) {
let object_meta = ObjectMeta {
location: Path::from(path),
last_modified: chrono::Utc::now(),
size: size as u64,
e_tag: None,
version: None,
};
let metadata: Arc<dyn FileMetadata> = Arc::new(TestFileMetadata {
metadata: "a".repeat(size),
});
(object_meta, metadata)
}
#[test]
fn test_default_file_metadata_cache_with_limit() {
let cache = DefaultFilesMetadataCache::new(1000);
let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100);
let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 500);
let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300);
cache.put(
&object_meta1.location,
CachedFileMetadataEntry::new(object_meta1.clone(), metadata1),
);
cache.put(
&object_meta2.location,
CachedFileMetadataEntry::new(object_meta2.clone(), metadata2),
);
cache.put(
&object_meta3.location,
CachedFileMetadataEntry::new(object_meta3.clone(), metadata3),
);
assert_eq!(cache.len(), 3);
assert_eq!(cache.memory_used(), 900);
assert!(cache.contains_key(&object_meta1.location));
assert!(cache.contains_key(&object_meta2.location));
assert!(cache.contains_key(&object_meta3.location));
let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 200);
cache.put(
&object_meta4.location,
CachedFileMetadataEntry::new(object_meta4.clone(), metadata4),
);
assert_eq!(cache.len(), 3);
assert_eq!(cache.memory_used(), 1000);
assert!(!cache.contains_key(&object_meta1.location));
assert!(cache.contains_key(&object_meta4.location));
let _ = cache.get(&object_meta2.location);
let (object_meta5, metadata5) = generate_test_metadata_with_size("5", 100);
cache.put(
&object_meta5.location,
CachedFileMetadataEntry::new(object_meta5.clone(), metadata5),
);
assert_eq!(cache.len(), 3);
assert_eq!(cache.memory_used(), 800);
assert!(!cache.contains_key(&object_meta3.location));
assert!(cache.contains_key(&object_meta5.location));
let (object_meta6, metadata6) = generate_test_metadata_with_size("6", 1200);
cache.put(
&object_meta6.location,
CachedFileMetadataEntry::new(object_meta6.clone(), metadata6),
);
assert_eq!(cache.len(), 3);
assert_eq!(cache.memory_used(), 800);
assert!(!cache.contains_key(&object_meta6.location));
let (object_meta7, metadata7) = generate_test_metadata_with_size("7", 200);
cache.put(
&object_meta7.location,
CachedFileMetadataEntry::new(object_meta7.clone(), metadata7),
);
assert_eq!(cache.len(), 4);
assert_eq!(cache.memory_used(), 1000);
assert!(cache.contains_key(&object_meta7.location));
let (object_meta8, metadata8) = generate_test_metadata_with_size("8", 999);
cache.put(
&object_meta8.location,
CachedFileMetadataEntry::new(object_meta8.clone(), metadata8),
);
assert_eq!(cache.len(), 1);
assert_eq!(cache.memory_used(), 999);
assert!(cache.contains_key(&object_meta8.location));
let (object_meta9, metadata9) = generate_test_metadata_with_size("9", 300);
let (object_meta10, metadata10) = generate_test_metadata_with_size("10", 200);
let (object_meta11_v1, metadata11_v1) =
generate_test_metadata_with_size("11", 400);
cache.put(
&object_meta9.location,
CachedFileMetadataEntry::new(object_meta9.clone(), metadata9),
);
cache.put(
&object_meta10.location,
CachedFileMetadataEntry::new(object_meta10.clone(), metadata10),
);
cache.put(
&object_meta11_v1.location,
CachedFileMetadataEntry::new(object_meta11_v1.clone(), metadata11_v1),
);
assert_eq!(cache.memory_used(), 900);
assert_eq!(cache.len(), 3);
let (object_meta11_v2, metadata11_v2) =
generate_test_metadata_with_size("11", 500);
cache.put(
&object_meta11_v2.location,
CachedFileMetadataEntry::new(object_meta11_v2.clone(), metadata11_v2),
);
assert_eq!(cache.memory_used(), 1000);
assert_eq!(cache.len(), 3);
assert!(cache.contains_key(&object_meta9.location));
assert!(cache.contains_key(&object_meta10.location));
assert!(cache.contains_key(&object_meta11_v2.location));
let (object_meta11_v3, metadata11_v3) =
generate_test_metadata_with_size("11", 501);
cache.put(
&object_meta11_v3.location,
CachedFileMetadataEntry::new(object_meta11_v3.clone(), metadata11_v3),
);
assert_eq!(cache.memory_used(), 701);
assert_eq!(cache.len(), 2);
assert!(cache.contains_key(&object_meta10.location));
assert!(cache.contains_key(&object_meta11_v3.location));
cache.remove(&object_meta11_v3.location);
assert_eq!(cache.len(), 1);
assert_eq!(cache.memory_used(), 200);
assert!(cache.contains_key(&object_meta10.location));
assert!(!cache.contains_key(&object_meta11_v3.location));
cache.clear();
assert_eq!(cache.len(), 0);
assert_eq!(cache.memory_used(), 0);
let (object_meta12, metadata12) = generate_test_metadata_with_size("12", 300);
let (object_meta13, metadata13) = generate_test_metadata_with_size("13", 200);
let (object_meta14, metadata14) = generate_test_metadata_with_size("14", 500);
cache.put(
&object_meta12.location,
CachedFileMetadataEntry::new(object_meta12.clone(), metadata12),
);
cache.put(
&object_meta13.location,
CachedFileMetadataEntry::new(object_meta13.clone(), metadata13),
);
cache.put(
&object_meta14.location,
CachedFileMetadataEntry::new(object_meta14.clone(), metadata14),
);
assert_eq!(cache.len(), 3);
assert_eq!(cache.memory_used(), 1000);
cache.update_cache_limit(600);
assert_eq!(cache.len(), 1);
assert_eq!(cache.memory_used(), 500);
assert!(!cache.contains_key(&object_meta12.location));
assert!(!cache.contains_key(&object_meta13.location));
assert!(cache.contains_key(&object_meta14.location));
}
#[test]
fn test_default_file_metadata_cache_entries_info() {
let cache = DefaultFilesMetadataCache::new(1000);
let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100);
let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 200);
let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300);
cache.put(
&object_meta1.location,
CachedFileMetadataEntry::new(object_meta1.clone(), metadata1),
);
cache.put(
&object_meta2.location,
CachedFileMetadataEntry::new(object_meta2.clone(), metadata2),
);
cache.put(
&object_meta3.location,
CachedFileMetadataEntry::new(object_meta3.clone(), metadata3),
);
assert_eq!(
cache.list_entries(),
HashMap::from([
(
Path::from("1"),
FileMetadataCacheEntry {
object_meta: object_meta1.clone(),
size_bytes: 100,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("2"),
FileMetadataCacheEntry {
object_meta: object_meta2.clone(),
size_bytes: 200,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("3"),
FileMetadataCacheEntry {
object_meta: object_meta3.clone(),
size_bytes: 300,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
)
])
);
let _ = cache.get(&object_meta1.location);
assert_eq!(
cache.list_entries(),
HashMap::from([
(
Path::from("1"),
FileMetadataCacheEntry {
object_meta: object_meta1.clone(),
size_bytes: 100,
hits: 1,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("2"),
FileMetadataCacheEntry {
object_meta: object_meta2.clone(),
size_bytes: 200,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("3"),
FileMetadataCacheEntry {
object_meta: object_meta3.clone(),
size_bytes: 300,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
)
])
);
let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 600);
cache.put(
&object_meta4.location,
CachedFileMetadataEntry::new(object_meta4.clone(), metadata4),
);
assert_eq!(
cache.list_entries(),
HashMap::from([
(
Path::from("1"),
FileMetadataCacheEntry {
object_meta: object_meta1.clone(),
size_bytes: 100,
hits: 1,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("3"),
FileMetadataCacheEntry {
object_meta: object_meta3.clone(),
size_bytes: 300,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("4"),
FileMetadataCacheEntry {
object_meta: object_meta4.clone(),
size_bytes: 600,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
)
])
);
let (object_meta1_new, metadata1_new) = generate_test_metadata_with_size("1", 50);
cache.put(
&object_meta1_new.location,
CachedFileMetadataEntry::new(object_meta1_new.clone(), metadata1_new),
);
assert_eq!(
cache.list_entries(),
HashMap::from([
(
Path::from("1"),
FileMetadataCacheEntry {
object_meta: object_meta1_new.clone(),
size_bytes: 50,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("3"),
FileMetadataCacheEntry {
object_meta: object_meta3.clone(),
size_bytes: 300,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("4"),
FileMetadataCacheEntry {
object_meta: object_meta4.clone(),
size_bytes: 600,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
)
])
);
cache.remove(&object_meta4.location);
assert_eq!(
cache.list_entries(),
HashMap::from([
(
Path::from("1"),
FileMetadataCacheEntry {
object_meta: object_meta1_new.clone(),
size_bytes: 50,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
),
(
Path::from("3"),
FileMetadataCacheEntry {
object_meta: object_meta3.clone(),
size_bytes: 300,
hits: 0,
extra: HashMap::from([(
"extra_info".to_owned(),
"abc".to_owned()
)]),
}
)
])
);
cache.clear();
assert_eq!(cache.list_entries(), HashMap::from([]));
}
}