1use std::{
19 collections::HashMap,
20 sync::{Arc, Mutex},
21};
22
23use object_store::{ObjectMeta, path::Path};
24
25use crate::cache::{
26 CacheAccessor,
27 cache_manager::{FileMetadata, FileMetadataCache, FileMetadataCacheEntry},
28 lru_queue::LruQueue,
29};
30
31struct DefaultFilesMetadataCacheState {
33 lru_queue: LruQueue<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
34 memory_limit: usize,
35 memory_used: usize,
36 cache_hits: HashMap<Path, usize>,
37}
38
39impl DefaultFilesMetadataCacheState {
40 fn new(memory_limit: usize) -> Self {
41 Self {
42 lru_queue: LruQueue::new(),
43 memory_limit,
44 memory_used: 0,
45 cache_hits: HashMap::new(),
46 }
47 }
48
49 fn get(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
53 self.lru_queue
54 .get(&k.location)
55 .map(|(object_meta, metadata)| {
56 if object_meta.size != k.size
57 || object_meta.last_modified != k.last_modified
58 {
59 None
60 } else {
61 *self.cache_hits.entry(k.location.clone()).or_insert(0) += 1;
62 Some(Arc::clone(metadata))
63 }
64 })
65 .unwrap_or(None)
66 }
67
68 fn contains_key(&self, k: &ObjectMeta) -> bool {
72 self.lru_queue
73 .peek(&k.location)
74 .map(|(object_meta, _)| {
75 object_meta.size == k.size && object_meta.last_modified == k.last_modified
76 })
77 .unwrap_or(false)
78 }
79
80 fn put(
84 &mut self,
85 key: ObjectMeta,
86 value: Arc<dyn FileMetadata>,
87 ) -> Option<Arc<dyn FileMetadata>> {
88 let value_size = value.memory_size();
89
90 if value_size > self.memory_limit {
92 return None;
93 }
94
95 self.cache_hits.insert(key.location.clone(), 0);
96 let old_value = self.lru_queue.put(key.location.clone(), (key, value));
98 self.memory_used += value_size;
99 if let Some((_, ref old_metadata)) = old_value {
100 self.memory_used -= old_metadata.memory_size();
101 }
102
103 self.evict_entries();
104
105 old_value.map(|v| v.1)
106 }
107
108 fn evict_entries(&mut self) {
110 while self.memory_used > self.memory_limit {
111 if let Some(removed) = self.lru_queue.pop() {
112 let metadata: Arc<dyn FileMetadata> = removed.1.1;
113 self.memory_used -= metadata.memory_size();
114 } else {
115 debug_assert!(
117 false,
118 "cache is empty while memory_used > memory_limit, cannot happen"
119 );
120 return;
121 }
122 }
123 }
124
125 fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
127 if let Some((_, old_metadata)) = self.lru_queue.remove(&k.location) {
128 self.memory_used -= old_metadata.memory_size();
129 self.cache_hits.remove(&k.location);
130 Some(old_metadata)
131 } else {
132 None
133 }
134 }
135
136 fn len(&self) -> usize {
138 self.lru_queue.len()
139 }
140
141 fn clear(&mut self) {
143 self.lru_queue.clear();
144 self.memory_used = 0;
145 self.cache_hits.clear();
146 }
147}
148
149pub struct DefaultFilesMetadataCache {
169 state: Mutex<DefaultFilesMetadataCacheState>,
171}
172
173impl DefaultFilesMetadataCache {
174 pub fn new(memory_limit: usize) -> Self {
180 Self {
181 state: Mutex::new(DefaultFilesMetadataCacheState::new(memory_limit)),
182 }
183 }
184
185 pub fn memory_used(&self) -> usize {
187 let state = self.state.lock().unwrap();
188 state.memory_used
189 }
190}
191
192impl FileMetadataCache for DefaultFilesMetadataCache {
193 fn cache_limit(&self) -> usize {
194 let state = self.state.lock().unwrap();
195 state.memory_limit
196 }
197
198 fn update_cache_limit(&self, limit: usize) {
199 let mut state = self.state.lock().unwrap();
200 state.memory_limit = limit;
201 state.evict_entries();
202 }
203
204 fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry> {
205 let state = self.state.lock().unwrap();
206 let mut entries = HashMap::<Path, FileMetadataCacheEntry>::new();
207
208 for (path, (object_meta, metadata)) in state.lru_queue.list_entries() {
209 entries.insert(
210 path.clone(),
211 FileMetadataCacheEntry {
212 object_meta: object_meta.clone(),
213 size_bytes: metadata.memory_size(),
214 hits: *state.cache_hits.get(path).expect("entry must exist"),
215 extra: metadata.extra_info(),
216 },
217 );
218 }
219
220 entries
221 }
222}
223
224impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for DefaultFilesMetadataCache {
225 type Extra = ObjectMeta;
226
227 fn get(&self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
228 let mut state = self.state.lock().unwrap();
229 state.get(k)
230 }
231
232 fn get_with_extra(
233 &self,
234 k: &ObjectMeta,
235 _e: &Self::Extra,
236 ) -> Option<Arc<dyn FileMetadata>> {
237 self.get(k)
238 }
239
240 fn put(
241 &self,
242 key: &ObjectMeta,
243 value: Arc<dyn FileMetadata>,
244 ) -> Option<Arc<dyn FileMetadata>> {
245 let mut state = self.state.lock().unwrap();
246 state.put(key.clone(), value)
247 }
248
249 fn put_with_extra(
250 &self,
251 key: &ObjectMeta,
252 value: Arc<dyn FileMetadata>,
253 _e: &Self::Extra,
254 ) -> Option<Arc<dyn FileMetadata>> {
255 self.put(key, value)
256 }
257
258 fn remove(&self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
259 let mut state = self.state.lock().unwrap();
260 state.remove(k)
261 }
262
263 fn contains_key(&self, k: &ObjectMeta) -> bool {
264 let state = self.state.lock().unwrap();
265 state.contains_key(k)
266 }
267
268 fn len(&self) -> usize {
269 let state = self.state.lock().unwrap();
270 state.len()
271 }
272
273 fn clear(&self) {
274 let mut state = self.state.lock().unwrap();
275 state.clear();
276 }
277
278 fn name(&self) -> String {
279 "DefaultFilesMetadataCache".to_string()
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use std::collections::HashMap;
286 use std::sync::Arc;
287
288 use crate::cache::CacheAccessor;
289 use crate::cache::cache_manager::{
290 FileMetadata, FileMetadataCache, FileMetadataCacheEntry,
291 };
292 use crate::cache::file_metadata_cache::DefaultFilesMetadataCache;
293 use object_store::ObjectMeta;
294 use object_store::path::Path;
295
296 pub struct TestFileMetadata {
297 metadata: String,
298 }
299
300 impl FileMetadata for TestFileMetadata {
301 fn as_any(&self) -> &dyn std::any::Any {
302 self
303 }
304
305 fn memory_size(&self) -> usize {
306 self.metadata.len()
307 }
308
309 fn extra_info(&self) -> HashMap<String, String> {
310 HashMap::from([("extra_info".to_owned(), "abc".to_owned())])
311 }
312 }
313
314 #[test]
315 fn test_default_file_metadata_cache() {
316 let object_meta = ObjectMeta {
317 location: Path::from("test"),
318 last_modified: chrono::DateTime::parse_from_rfc3339(
319 "2025-07-29T12:12:12+00:00",
320 )
321 .unwrap()
322 .into(),
323 size: 1024,
324 e_tag: None,
325 version: None,
326 };
327
328 let metadata: Arc<dyn FileMetadata> = Arc::new(TestFileMetadata {
329 metadata: "retrieved_metadata".to_owned(),
330 });
331
332 let cache = DefaultFilesMetadataCache::new(1024 * 1024);
333 assert!(cache.get(&object_meta).is_none());
334
335 cache.put(&object_meta, Arc::clone(&metadata));
337
338 assert!(cache.contains_key(&object_meta));
340 let value = cache.get(&object_meta);
341 assert!(value.is_some());
342 let test_file_metadata = Arc::downcast::<TestFileMetadata>(value.unwrap());
343 assert!(test_file_metadata.is_ok());
344 assert_eq!(test_file_metadata.unwrap().metadata, "retrieved_metadata");
345
346 let mut object_meta2 = object_meta.clone();
348 object_meta2.size = 2048;
349 assert!(cache.get(&object_meta2).is_none());
350 assert!(!cache.contains_key(&object_meta2));
351
352 let mut object_meta2 = object_meta.clone();
354 object_meta2.last_modified =
355 chrono::DateTime::parse_from_rfc3339("2025-07-29T13:13:13+00:00")
356 .unwrap()
357 .into();
358 assert!(cache.get(&object_meta2).is_none());
359 assert!(!cache.contains_key(&object_meta2));
360
361 let mut object_meta2 = object_meta.clone();
363 object_meta2.location = Path::from("test2");
364 assert!(cache.get(&object_meta2).is_none());
365 assert!(!cache.contains_key(&object_meta2));
366
367 cache.remove(&object_meta);
369 assert!(cache.get(&object_meta).is_none());
370 assert!(!cache.contains_key(&object_meta));
371
372 cache.put(&object_meta, Arc::clone(&metadata));
374 cache.put(&object_meta2, metadata);
375 assert_eq!(cache.len(), 2);
376 cache.clear();
377 assert_eq!(cache.len(), 0);
378 }
379
380 fn generate_test_metadata_with_size(
381 path: &str,
382 size: usize,
383 ) -> (ObjectMeta, Arc<dyn FileMetadata>) {
384 let object_meta = ObjectMeta {
385 location: Path::from(path),
386 last_modified: chrono::Utc::now(),
387 size: size as u64,
388 e_tag: None,
389 version: None,
390 };
391 let metadata: Arc<dyn FileMetadata> = Arc::new(TestFileMetadata {
392 metadata: "a".repeat(size),
393 });
394
395 (object_meta, metadata)
396 }
397
398 #[test]
399 fn test_default_file_metadata_cache_with_limit() {
400 let cache = DefaultFilesMetadataCache::new(1000);
401 let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100);
402 let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 500);
403 let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300);
404
405 cache.put(&object_meta1, metadata1);
406 cache.put(&object_meta2, metadata2);
407 cache.put(&object_meta3, metadata3);
408
409 assert_eq!(cache.len(), 3);
411 assert_eq!(cache.memory_used(), 900);
412 assert!(cache.contains_key(&object_meta1));
413 assert!(cache.contains_key(&object_meta2));
414 assert!(cache.contains_key(&object_meta3));
415
416 let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 200);
418 cache.put(&object_meta4, metadata4);
419 assert_eq!(cache.len(), 3);
420 assert_eq!(cache.memory_used(), 1000);
421 assert!(!cache.contains_key(&object_meta1));
422 assert!(cache.contains_key(&object_meta4));
423
424 cache.get(&object_meta2);
427 let (object_meta5, metadata5) = generate_test_metadata_with_size("5", 100);
428 cache.put(&object_meta5, metadata5);
429 assert_eq!(cache.len(), 3);
430 assert_eq!(cache.memory_used(), 800);
431 assert!(!cache.contains_key(&object_meta3));
432 assert!(cache.contains_key(&object_meta5));
433
434 let (object_meta6, metadata6) = generate_test_metadata_with_size("6", 1200);
436 cache.put(&object_meta6, metadata6);
437 assert_eq!(cache.len(), 3);
438 assert_eq!(cache.memory_used(), 800);
439 assert!(!cache.contains_key(&object_meta6));
440
441 let (object_meta7, metadata7) = generate_test_metadata_with_size("7", 200);
443 cache.put(&object_meta7, metadata7);
444 assert_eq!(cache.len(), 4);
445 assert_eq!(cache.memory_used(), 1000);
446 assert!(cache.contains_key(&object_meta7));
447
448 let (object_meta8, metadata8) = generate_test_metadata_with_size("8", 999);
450 cache.put(&object_meta8, metadata8);
451 assert_eq!(cache.len(), 1);
452 assert_eq!(cache.memory_used(), 999);
453 assert!(cache.contains_key(&object_meta8));
454
455 let (object_meta9, metadata9) = generate_test_metadata_with_size("9", 300);
457 let (object_meta10, metadata10) = generate_test_metadata_with_size("10", 200);
458 let (object_meta11_v1, metadata11_v1) =
459 generate_test_metadata_with_size("11", 400);
460 cache.put(&object_meta9, metadata9);
461 cache.put(&object_meta10, metadata10);
462 cache.put(&object_meta11_v1, metadata11_v1);
463 assert_eq!(cache.memory_used(), 900);
464 assert_eq!(cache.len(), 3);
465 let (object_meta11_v2, metadata11_v2) =
466 generate_test_metadata_with_size("11", 500);
467 cache.put(&object_meta11_v2, metadata11_v2);
468 assert_eq!(cache.memory_used(), 1000);
469 assert_eq!(cache.len(), 3);
470 assert!(cache.contains_key(&object_meta9));
471 assert!(cache.contains_key(&object_meta10));
472 assert!(cache.contains_key(&object_meta11_v2));
473 assert!(!cache.contains_key(&object_meta11_v1));
474
475 let (object_meta11_v3, metadata11_v3) =
477 generate_test_metadata_with_size("11", 501);
478 cache.put(&object_meta11_v3, metadata11_v3);
479 assert_eq!(cache.memory_used(), 701);
480 assert_eq!(cache.len(), 2);
481 assert!(cache.contains_key(&object_meta10));
482 assert!(cache.contains_key(&object_meta11_v3));
483 assert!(!cache.contains_key(&object_meta11_v2));
484
485 cache.remove(&object_meta11_v3);
487 assert_eq!(cache.len(), 1);
488 assert_eq!(cache.memory_used(), 200);
489 assert!(cache.contains_key(&object_meta10));
490 assert!(!cache.contains_key(&object_meta11_v3));
491
492 cache.clear();
494 assert_eq!(cache.len(), 0);
495 assert_eq!(cache.memory_used(), 0);
496
497 let (object_meta12, metadata12) = generate_test_metadata_with_size("12", 300);
499 let (object_meta13, metadata13) = generate_test_metadata_with_size("13", 200);
500 let (object_meta14, metadata14) = generate_test_metadata_with_size("14", 500);
501 cache.put(&object_meta12, metadata12);
502 cache.put(&object_meta13, metadata13);
503 cache.put(&object_meta14, metadata14);
504 assert_eq!(cache.len(), 3);
505 assert_eq!(cache.memory_used(), 1000);
506 cache.update_cache_limit(600);
507 assert_eq!(cache.len(), 1);
508 assert_eq!(cache.memory_used(), 500);
509 assert!(!cache.contains_key(&object_meta12));
510 assert!(!cache.contains_key(&object_meta13));
511 assert!(cache.contains_key(&object_meta14));
512 }
513
514 #[test]
515 fn test_default_file_metadata_cache_entries_info() {
516 let cache = DefaultFilesMetadataCache::new(1000);
517 let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100);
518 let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 200);
519 let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300);
520
521 cache.put(&object_meta1, metadata1);
523 cache.put(&object_meta2, metadata2);
524 cache.put(&object_meta3, metadata3);
525 assert_eq!(
526 cache.list_entries(),
527 HashMap::from([
528 (
529 Path::from("1"),
530 FileMetadataCacheEntry {
531 object_meta: object_meta1.clone(),
532 size_bytes: 100,
533 hits: 0,
534 extra: HashMap::from([(
535 "extra_info".to_owned(),
536 "abc".to_owned()
537 )]),
538 }
539 ),
540 (
541 Path::from("2"),
542 FileMetadataCacheEntry {
543 object_meta: object_meta2.clone(),
544 size_bytes: 200,
545 hits: 0,
546 extra: HashMap::from([(
547 "extra_info".to_owned(),
548 "abc".to_owned()
549 )]),
550 }
551 ),
552 (
553 Path::from("3"),
554 FileMetadataCacheEntry {
555 object_meta: object_meta3.clone(),
556 size_bytes: 300,
557 hits: 0,
558 extra: HashMap::from([(
559 "extra_info".to_owned(),
560 "abc".to_owned()
561 )]),
562 }
563 )
564 ])
565 );
566
567 cache.get(&object_meta1);
569 assert_eq!(
570 cache.list_entries(),
571 HashMap::from([
572 (
573 Path::from("1"),
574 FileMetadataCacheEntry {
575 object_meta: object_meta1.clone(),
576 size_bytes: 100,
577 hits: 1,
578 extra: HashMap::from([(
579 "extra_info".to_owned(),
580 "abc".to_owned()
581 )]),
582 }
583 ),
584 (
585 Path::from("2"),
586 FileMetadataCacheEntry {
587 object_meta: object_meta2.clone(),
588 size_bytes: 200,
589 hits: 0,
590 extra: HashMap::from([(
591 "extra_info".to_owned(),
592 "abc".to_owned()
593 )]),
594 }
595 ),
596 (
597 Path::from("3"),
598 FileMetadataCacheEntry {
599 object_meta: object_meta3.clone(),
600 size_bytes: 300,
601 hits: 0,
602 extra: HashMap::from([(
603 "extra_info".to_owned(),
604 "abc".to_owned()
605 )]),
606 }
607 )
608 ])
609 );
610
611 let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 600);
613 cache.put(&object_meta4, metadata4);
614 assert_eq!(
615 cache.list_entries(),
616 HashMap::from([
617 (
618 Path::from("1"),
619 FileMetadataCacheEntry {
620 object_meta: object_meta1.clone(),
621 size_bytes: 100,
622 hits: 1,
623 extra: HashMap::from([(
624 "extra_info".to_owned(),
625 "abc".to_owned()
626 )]),
627 }
628 ),
629 (
630 Path::from("3"),
631 FileMetadataCacheEntry {
632 object_meta: object_meta3.clone(),
633 size_bytes: 300,
634 hits: 0,
635 extra: HashMap::from([(
636 "extra_info".to_owned(),
637 "abc".to_owned()
638 )]),
639 }
640 ),
641 (
642 Path::from("4"),
643 FileMetadataCacheEntry {
644 object_meta: object_meta4.clone(),
645 size_bytes: 600,
646 hits: 0,
647 extra: HashMap::from([(
648 "extra_info".to_owned(),
649 "abc".to_owned()
650 )]),
651 }
652 )
653 ])
654 );
655
656 let (object_meta1_new, metadata1_new) = generate_test_metadata_with_size("1", 50);
658 cache.put(&object_meta1_new, metadata1_new);
659 assert_eq!(
660 cache.list_entries(),
661 HashMap::from([
662 (
663 Path::from("1"),
664 FileMetadataCacheEntry {
665 object_meta: object_meta1_new.clone(),
666 size_bytes: 50,
667 hits: 0,
668 extra: HashMap::from([(
669 "extra_info".to_owned(),
670 "abc".to_owned()
671 )]),
672 }
673 ),
674 (
675 Path::from("3"),
676 FileMetadataCacheEntry {
677 object_meta: object_meta3.clone(),
678 size_bytes: 300,
679 hits: 0,
680 extra: HashMap::from([(
681 "extra_info".to_owned(),
682 "abc".to_owned()
683 )]),
684 }
685 ),
686 (
687 Path::from("4"),
688 FileMetadataCacheEntry {
689 object_meta: object_meta4.clone(),
690 size_bytes: 600,
691 hits: 0,
692 extra: HashMap::from([(
693 "extra_info".to_owned(),
694 "abc".to_owned()
695 )]),
696 }
697 )
698 ])
699 );
700
701 cache.remove(&object_meta4);
703 assert_eq!(
704 cache.list_entries(),
705 HashMap::from([
706 (
707 Path::from("1"),
708 FileMetadataCacheEntry {
709 object_meta: object_meta1_new.clone(),
710 size_bytes: 50,
711 hits: 0,
712 extra: HashMap::from([(
713 "extra_info".to_owned(),
714 "abc".to_owned()
715 )]),
716 }
717 ),
718 (
719 Path::from("3"),
720 FileMetadataCacheEntry {
721 object_meta: object_meta3.clone(),
722 size_bytes: 300,
723 hits: 0,
724 extra: HashMap::from([(
725 "extra_info".to_owned(),
726 "abc".to_owned()
727 )]),
728 }
729 )
730 ])
731 );
732
733 cache.clear();
735 assert_eq!(cache.list_entries(), HashMap::from([]));
736 }
737}