1use std::{collections::HashMap, sync::Mutex};
19
20use object_store::path::Path;
21
22use crate::cache::{
23 CacheAccessor,
24 cache_manager::{CachedFileMetadataEntry, FileMetadataCache, FileMetadataCacheEntry},
25 lru_queue::LruQueue,
26};
27
28struct DefaultFilesMetadataCacheState {
30 lru_queue: LruQueue<Path, CachedFileMetadataEntry>,
31 memory_limit: usize,
32 memory_used: usize,
33 cache_hits: HashMap<Path, usize>,
34}
35
36impl DefaultFilesMetadataCacheState {
37 fn new(memory_limit: usize) -> Self {
38 Self {
39 lru_queue: LruQueue::new(),
40 memory_limit,
41 memory_used: 0,
42 cache_hits: HashMap::new(),
43 }
44 }
45
46 fn get(&mut self, k: &Path) -> Option<CachedFileMetadataEntry> {
49 self.lru_queue.get(k).cloned().inspect(|_| {
50 *self.cache_hits.entry(k.clone()).or_insert(0) += 1;
51 })
52 }
53
54 fn contains_key(&self, k: &Path) -> bool {
57 self.lru_queue.peek(k).is_some()
58 }
59
60 fn put(
64 &mut self,
65 key: Path,
66 value: CachedFileMetadataEntry,
67 ) -> Option<CachedFileMetadataEntry> {
68 let value_size = value.file_metadata.memory_size();
69
70 if value_size > self.memory_limit {
72 return None;
73 }
74
75 self.cache_hits.insert(key.clone(), 0);
76 let old_value = self.lru_queue.put(key, value);
78 self.memory_used += value_size;
79 if let Some(ref old_entry) = old_value {
80 self.memory_used -= old_entry.file_metadata.memory_size();
81 }
82
83 self.evict_entries();
84
85 old_value
86 }
87
88 fn evict_entries(&mut self) {
90 while self.memory_used > self.memory_limit {
91 if let Some(removed) = self.lru_queue.pop() {
92 self.memory_used -= removed.1.file_metadata.memory_size();
93 } else {
94 debug_assert!(
96 false,
97 "cache is empty while memory_used > memory_limit, cannot happen"
98 );
99 return;
100 }
101 }
102 }
103
104 fn remove(&mut self, k: &Path) -> Option<CachedFileMetadataEntry> {
106 if let Some(old_entry) = self.lru_queue.remove(k) {
107 self.memory_used -= old_entry.file_metadata.memory_size();
108 self.cache_hits.remove(k);
109 Some(old_entry)
110 } else {
111 None
112 }
113 }
114
115 fn len(&self) -> usize {
117 self.lru_queue.len()
118 }
119
120 fn clear(&mut self) {
122 self.lru_queue.clear();
123 self.memory_used = 0;
124 self.cache_hits.clear();
125 }
126}
127
128pub struct DefaultFilesMetadataCache {
144 state: Mutex<DefaultFilesMetadataCacheState>,
146}
147
148impl DefaultFilesMetadataCache {
149 pub fn new(memory_limit: usize) -> Self {
155 Self {
156 state: Mutex::new(DefaultFilesMetadataCacheState::new(memory_limit)),
157 }
158 }
159
160 pub fn memory_used(&self) -> usize {
162 let state = self.state.lock().unwrap();
163 state.memory_used
164 }
165}
166
167impl CacheAccessor<Path, CachedFileMetadataEntry> for DefaultFilesMetadataCache {
168 fn get(&self, key: &Path) -> Option<CachedFileMetadataEntry> {
169 let mut state = self.state.lock().unwrap();
170 state.get(key)
171 }
172
173 fn put(
174 &self,
175 key: &Path,
176 value: CachedFileMetadataEntry,
177 ) -> Option<CachedFileMetadataEntry> {
178 let mut state = self.state.lock().unwrap();
179 state.put(key.clone(), value)
180 }
181
182 fn remove(&self, k: &Path) -> Option<CachedFileMetadataEntry> {
183 let mut state = self.state.lock().unwrap();
184 state.remove(k)
185 }
186
187 fn contains_key(&self, k: &Path) -> bool {
188 let state = self.state.lock().unwrap();
189 state.contains_key(k)
190 }
191
192 fn len(&self) -> usize {
193 let state = self.state.lock().unwrap();
194 state.len()
195 }
196
197 fn clear(&self) {
198 let mut state = self.state.lock().unwrap();
199 state.clear();
200 }
201
202 fn name(&self) -> String {
203 "DefaultFilesMetadataCache".to_string()
204 }
205}
206
207impl FileMetadataCache for DefaultFilesMetadataCache {
208 fn cache_limit(&self) -> usize {
209 let state = self.state.lock().unwrap();
210 state.memory_limit
211 }
212
213 fn update_cache_limit(&self, limit: usize) {
214 let mut state = self.state.lock().unwrap();
215 state.memory_limit = limit;
216 state.evict_entries();
217 }
218
219 fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry> {
220 let state = self.state.lock().unwrap();
221 let mut entries = HashMap::<Path, FileMetadataCacheEntry>::new();
222
223 for (path, entry) in state.lru_queue.list_entries() {
224 entries.insert(
225 path.clone(),
226 FileMetadataCacheEntry {
227 object_meta: entry.meta.clone(),
228 size_bytes: entry.file_metadata.memory_size(),
229 hits: *state.cache_hits.get(path).expect("entry must exist"),
230 extra: entry.file_metadata.extra_info(),
231 },
232 );
233 }
234
235 entries
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use std::collections::HashMap;
242 use std::sync::Arc;
243
244 use crate::cache::CacheAccessor;
245 use crate::cache::cache_manager::{
246 CachedFileMetadataEntry, FileMetadata, FileMetadataCache, FileMetadataCacheEntry,
247 };
248 use crate::cache::file_metadata_cache::DefaultFilesMetadataCache;
249 use object_store::ObjectMeta;
250 use object_store::path::Path;
251
252 pub struct TestFileMetadata {
253 metadata: String,
254 }
255
256 impl FileMetadata for TestFileMetadata {
257 fn as_any(&self) -> &dyn std::any::Any {
258 self
259 }
260
261 fn memory_size(&self) -> usize {
262 self.metadata.len()
263 }
264
265 fn extra_info(&self) -> HashMap<String, String> {
266 HashMap::from([("extra_info".to_owned(), "abc".to_owned())])
267 }
268 }
269
270 fn create_test_object_meta(path: &str, size: usize) -> ObjectMeta {
271 ObjectMeta {
272 location: Path::from(path),
273 last_modified: chrono::DateTime::parse_from_rfc3339(
274 "2025-07-29T12:12:12+00:00",
275 )
276 .unwrap()
277 .into(),
278 size: size as u64,
279 e_tag: None,
280 version: None,
281 }
282 }
283
284 #[test]
285 fn test_default_file_metadata_cache() {
286 let object_meta = create_test_object_meta("test", 1024);
287
288 let metadata: Arc<dyn FileMetadata> = Arc::new(TestFileMetadata {
289 metadata: "retrieved_metadata".to_owned(),
290 });
291
292 let cache = DefaultFilesMetadataCache::new(1024 * 1024);
293
294 assert!(cache.get(&object_meta.location).is_none());
296
297 let cached_entry =
299 CachedFileMetadataEntry::new(object_meta.clone(), Arc::clone(&metadata));
300 cache.put(&object_meta.location, cached_entry);
301
302 assert!(cache.contains_key(&object_meta.location));
304 let result = cache.get(&object_meta.location).unwrap();
305 let test_file_metadata = Arc::downcast::<TestFileMetadata>(result.file_metadata);
306 assert!(test_file_metadata.is_ok());
307 assert_eq!(test_file_metadata.unwrap().metadata, "retrieved_metadata");
308
309 let result2 = cache.get(&object_meta.location).unwrap();
311 assert!(result2.is_valid_for(&object_meta));
312
313 let object_meta2 = create_test_object_meta("test", 2048);
315 let result3 = cache.get(&object_meta2.location).unwrap();
316 assert!(!result3.is_valid_for(&object_meta2));
318
319 let new_entry =
321 CachedFileMetadataEntry::new(object_meta2.clone(), Arc::clone(&metadata));
322 cache.put(&object_meta2.location, new_entry);
323
324 let result4 = cache.get(&object_meta2.location).unwrap();
325 assert_eq!(result4.meta.size, 2048);
326
327 cache.remove(&object_meta.location);
329 assert!(!cache.contains_key(&object_meta.location));
330
331 let object_meta3 = create_test_object_meta("test3", 100);
333 cache.put(
334 &object_meta.location,
335 CachedFileMetadataEntry::new(object_meta.clone(), Arc::clone(&metadata)),
336 );
337 cache.put(
338 &object_meta3.location,
339 CachedFileMetadataEntry::new(object_meta3.clone(), Arc::clone(&metadata)),
340 );
341 assert_eq!(cache.len(), 2);
342 cache.clear();
343 assert_eq!(cache.len(), 0);
344 }
345
346 fn generate_test_metadata_with_size(
347 path: &str,
348 size: usize,
349 ) -> (ObjectMeta, Arc<dyn FileMetadata>) {
350 let object_meta = ObjectMeta {
351 location: Path::from(path),
352 last_modified: chrono::Utc::now(),
353 size: size as u64,
354 e_tag: None,
355 version: None,
356 };
357 let metadata: Arc<dyn FileMetadata> = Arc::new(TestFileMetadata {
358 metadata: "a".repeat(size),
359 });
360
361 (object_meta, metadata)
362 }
363
364 #[test]
365 fn test_default_file_metadata_cache_with_limit() {
366 let cache = DefaultFilesMetadataCache::new(1000);
367 let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100);
368 let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 500);
369 let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300);
370
371 cache.put(
372 &object_meta1.location,
373 CachedFileMetadataEntry::new(object_meta1.clone(), metadata1),
374 );
375 cache.put(
376 &object_meta2.location,
377 CachedFileMetadataEntry::new(object_meta2.clone(), metadata2),
378 );
379 cache.put(
380 &object_meta3.location,
381 CachedFileMetadataEntry::new(object_meta3.clone(), metadata3),
382 );
383
384 assert_eq!(cache.len(), 3);
386 assert_eq!(cache.memory_used(), 900);
387 assert!(cache.contains_key(&object_meta1.location));
388 assert!(cache.contains_key(&object_meta2.location));
389 assert!(cache.contains_key(&object_meta3.location));
390
391 let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 200);
393 cache.put(
394 &object_meta4.location,
395 CachedFileMetadataEntry::new(object_meta4.clone(), metadata4),
396 );
397 assert_eq!(cache.len(), 3);
398 assert_eq!(cache.memory_used(), 1000);
399 assert!(!cache.contains_key(&object_meta1.location));
400 assert!(cache.contains_key(&object_meta4.location));
401
402 let _ = cache.get(&object_meta2.location);
405 let (object_meta5, metadata5) = generate_test_metadata_with_size("5", 100);
406 cache.put(
407 &object_meta5.location,
408 CachedFileMetadataEntry::new(object_meta5.clone(), metadata5),
409 );
410 assert_eq!(cache.len(), 3);
411 assert_eq!(cache.memory_used(), 800);
412 assert!(!cache.contains_key(&object_meta3.location));
413 assert!(cache.contains_key(&object_meta5.location));
414
415 let (object_meta6, metadata6) = generate_test_metadata_with_size("6", 1200);
417 cache.put(
418 &object_meta6.location,
419 CachedFileMetadataEntry::new(object_meta6.clone(), metadata6),
420 );
421 assert_eq!(cache.len(), 3);
422 assert_eq!(cache.memory_used(), 800);
423 assert!(!cache.contains_key(&object_meta6.location));
424
425 let (object_meta7, metadata7) = generate_test_metadata_with_size("7", 200);
427 cache.put(
428 &object_meta7.location,
429 CachedFileMetadataEntry::new(object_meta7.clone(), metadata7),
430 );
431 assert_eq!(cache.len(), 4);
432 assert_eq!(cache.memory_used(), 1000);
433 assert!(cache.contains_key(&object_meta7.location));
434
435 let (object_meta8, metadata8) = generate_test_metadata_with_size("8", 999);
437 cache.put(
438 &object_meta8.location,
439 CachedFileMetadataEntry::new(object_meta8.clone(), metadata8),
440 );
441 assert_eq!(cache.len(), 1);
442 assert_eq!(cache.memory_used(), 999);
443 assert!(cache.contains_key(&object_meta8.location));
444
445 let (object_meta9, metadata9) = generate_test_metadata_with_size("9", 300);
447 let (object_meta10, metadata10) = generate_test_metadata_with_size("10", 200);
448 let (object_meta11_v1, metadata11_v1) =
449 generate_test_metadata_with_size("11", 400);
450 cache.put(
451 &object_meta9.location,
452 CachedFileMetadataEntry::new(object_meta9.clone(), metadata9),
453 );
454 cache.put(
455 &object_meta10.location,
456 CachedFileMetadataEntry::new(object_meta10.clone(), metadata10),
457 );
458 cache.put(
459 &object_meta11_v1.location,
460 CachedFileMetadataEntry::new(object_meta11_v1.clone(), metadata11_v1),
461 );
462 assert_eq!(cache.memory_used(), 900);
463 assert_eq!(cache.len(), 3);
464 let (object_meta11_v2, metadata11_v2) =
465 generate_test_metadata_with_size("11", 500);
466 cache.put(
467 &object_meta11_v2.location,
468 CachedFileMetadataEntry::new(object_meta11_v2.clone(), metadata11_v2),
469 );
470 assert_eq!(cache.memory_used(), 1000);
471 assert_eq!(cache.len(), 3);
472 assert!(cache.contains_key(&object_meta9.location));
473 assert!(cache.contains_key(&object_meta10.location));
474 assert!(cache.contains_key(&object_meta11_v2.location));
475
476 let (object_meta11_v3, metadata11_v3) =
478 generate_test_metadata_with_size("11", 501);
479 cache.put(
480 &object_meta11_v3.location,
481 CachedFileMetadataEntry::new(object_meta11_v3.clone(), metadata11_v3),
482 );
483 assert_eq!(cache.memory_used(), 701);
484 assert_eq!(cache.len(), 2);
485 assert!(cache.contains_key(&object_meta10.location));
486 assert!(cache.contains_key(&object_meta11_v3.location));
487
488 cache.remove(&object_meta11_v3.location);
490 assert_eq!(cache.len(), 1);
491 assert_eq!(cache.memory_used(), 200);
492 assert!(cache.contains_key(&object_meta10.location));
493 assert!(!cache.contains_key(&object_meta11_v3.location));
494
495 cache.clear();
497 assert_eq!(cache.len(), 0);
498 assert_eq!(cache.memory_used(), 0);
499
500 let (object_meta12, metadata12) = generate_test_metadata_with_size("12", 300);
502 let (object_meta13, metadata13) = generate_test_metadata_with_size("13", 200);
503 let (object_meta14, metadata14) = generate_test_metadata_with_size("14", 500);
504 cache.put(
505 &object_meta12.location,
506 CachedFileMetadataEntry::new(object_meta12.clone(), metadata12),
507 );
508 cache.put(
509 &object_meta13.location,
510 CachedFileMetadataEntry::new(object_meta13.clone(), metadata13),
511 );
512 cache.put(
513 &object_meta14.location,
514 CachedFileMetadataEntry::new(object_meta14.clone(), metadata14),
515 );
516 assert_eq!(cache.len(), 3);
517 assert_eq!(cache.memory_used(), 1000);
518 cache.update_cache_limit(600);
519 assert_eq!(cache.len(), 1);
520 assert_eq!(cache.memory_used(), 500);
521 assert!(!cache.contains_key(&object_meta12.location));
522 assert!(!cache.contains_key(&object_meta13.location));
523 assert!(cache.contains_key(&object_meta14.location));
524 }
525
526 #[test]
527 fn test_default_file_metadata_cache_entries_info() {
528 let cache = DefaultFilesMetadataCache::new(1000);
529 let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100);
530 let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 200);
531 let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300);
532
533 cache.put(
535 &object_meta1.location,
536 CachedFileMetadataEntry::new(object_meta1.clone(), metadata1),
537 );
538 cache.put(
539 &object_meta2.location,
540 CachedFileMetadataEntry::new(object_meta2.clone(), metadata2),
541 );
542 cache.put(
543 &object_meta3.location,
544 CachedFileMetadataEntry::new(object_meta3.clone(), metadata3),
545 );
546 assert_eq!(
547 cache.list_entries(),
548 HashMap::from([
549 (
550 Path::from("1"),
551 FileMetadataCacheEntry {
552 object_meta: object_meta1.clone(),
553 size_bytes: 100,
554 hits: 0,
555 extra: HashMap::from([(
556 "extra_info".to_owned(),
557 "abc".to_owned()
558 )]),
559 }
560 ),
561 (
562 Path::from("2"),
563 FileMetadataCacheEntry {
564 object_meta: object_meta2.clone(),
565 size_bytes: 200,
566 hits: 0,
567 extra: HashMap::from([(
568 "extra_info".to_owned(),
569 "abc".to_owned()
570 )]),
571 }
572 ),
573 (
574 Path::from("3"),
575 FileMetadataCacheEntry {
576 object_meta: object_meta3.clone(),
577 size_bytes: 300,
578 hits: 0,
579 extra: HashMap::from([(
580 "extra_info".to_owned(),
581 "abc".to_owned()
582 )]),
583 }
584 )
585 ])
586 );
587
588 let _ = cache.get(&object_meta1.location);
590 assert_eq!(
591 cache.list_entries(),
592 HashMap::from([
593 (
594 Path::from("1"),
595 FileMetadataCacheEntry {
596 object_meta: object_meta1.clone(),
597 size_bytes: 100,
598 hits: 1,
599 extra: HashMap::from([(
600 "extra_info".to_owned(),
601 "abc".to_owned()
602 )]),
603 }
604 ),
605 (
606 Path::from("2"),
607 FileMetadataCacheEntry {
608 object_meta: object_meta2.clone(),
609 size_bytes: 200,
610 hits: 0,
611 extra: HashMap::from([(
612 "extra_info".to_owned(),
613 "abc".to_owned()
614 )]),
615 }
616 ),
617 (
618 Path::from("3"),
619 FileMetadataCacheEntry {
620 object_meta: object_meta3.clone(),
621 size_bytes: 300,
622 hits: 0,
623 extra: HashMap::from([(
624 "extra_info".to_owned(),
625 "abc".to_owned()
626 )]),
627 }
628 )
629 ])
630 );
631
632 let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 600);
634 cache.put(
635 &object_meta4.location,
636 CachedFileMetadataEntry::new(object_meta4.clone(), metadata4),
637 );
638 assert_eq!(
639 cache.list_entries(),
640 HashMap::from([
641 (
642 Path::from("1"),
643 FileMetadataCacheEntry {
644 object_meta: object_meta1.clone(),
645 size_bytes: 100,
646 hits: 1,
647 extra: HashMap::from([(
648 "extra_info".to_owned(),
649 "abc".to_owned()
650 )]),
651 }
652 ),
653 (
654 Path::from("3"),
655 FileMetadataCacheEntry {
656 object_meta: object_meta3.clone(),
657 size_bytes: 300,
658 hits: 0,
659 extra: HashMap::from([(
660 "extra_info".to_owned(),
661 "abc".to_owned()
662 )]),
663 }
664 ),
665 (
666 Path::from("4"),
667 FileMetadataCacheEntry {
668 object_meta: object_meta4.clone(),
669 size_bytes: 600,
670 hits: 0,
671 extra: HashMap::from([(
672 "extra_info".to_owned(),
673 "abc".to_owned()
674 )]),
675 }
676 )
677 ])
678 );
679
680 let (object_meta1_new, metadata1_new) = generate_test_metadata_with_size("1", 50);
682 cache.put(
683 &object_meta1_new.location,
684 CachedFileMetadataEntry::new(object_meta1_new.clone(), metadata1_new),
685 );
686 assert_eq!(
687 cache.list_entries(),
688 HashMap::from([
689 (
690 Path::from("1"),
691 FileMetadataCacheEntry {
692 object_meta: object_meta1_new.clone(),
693 size_bytes: 50,
694 hits: 0,
695 extra: HashMap::from([(
696 "extra_info".to_owned(),
697 "abc".to_owned()
698 )]),
699 }
700 ),
701 (
702 Path::from("3"),
703 FileMetadataCacheEntry {
704 object_meta: object_meta3.clone(),
705 size_bytes: 300,
706 hits: 0,
707 extra: HashMap::from([(
708 "extra_info".to_owned(),
709 "abc".to_owned()
710 )]),
711 }
712 ),
713 (
714 Path::from("4"),
715 FileMetadataCacheEntry {
716 object_meta: object_meta4.clone(),
717 size_bytes: 600,
718 hits: 0,
719 extra: HashMap::from([(
720 "extra_info".to_owned(),
721 "abc".to_owned()
722 )]),
723 }
724 )
725 ])
726 );
727
728 cache.remove(&object_meta4.location);
730 assert_eq!(
731 cache.list_entries(),
732 HashMap::from([
733 (
734 Path::from("1"),
735 FileMetadataCacheEntry {
736 object_meta: object_meta1_new.clone(),
737 size_bytes: 50,
738 hits: 0,
739 extra: HashMap::from([(
740 "extra_info".to_owned(),
741 "abc".to_owned()
742 )]),
743 }
744 ),
745 (
746 Path::from("3"),
747 FileMetadataCacheEntry {
748 object_meta: object_meta3.clone(),
749 size_bytes: 300,
750 hits: 0,
751 extra: HashMap::from([(
752 "extra_info".to_owned(),
753 "abc".to_owned()
754 )]),
755 }
756 )
757 ])
758 );
759
760 cache.clear();
762 assert_eq!(cache.list_entries(), HashMap::from([]));
763 }
764}