1use std::mem::size_of;
19use std::{
20 collections::HashMap,
21 sync::{Arc, Mutex},
22 time::Duration,
23};
24
25use datafusion_common::TableReference;
26use datafusion_common::instant::Instant;
27use object_store::{ObjectMeta, path::Path};
28
29use crate::cache::{CacheAccessor, cache_manager::ListFilesCache, lru_queue::LruQueue};
30
31pub trait TimeProvider: Send + Sync + 'static {
32 fn now(&self) -> Instant;
33}
34
35#[derive(Debug, Default)]
36pub struct SystemTimeProvider;
37
38impl TimeProvider for SystemTimeProvider {
39 fn now(&self) -> Instant {
40 Instant::now()
41 }
42}
43
44pub struct DefaultListFilesCache {
61 state: Mutex<DefaultListFilesCacheState>,
62 time_provider: Arc<dyn TimeProvider>,
63}
64
65impl Default for DefaultListFilesCache {
66 fn default() -> Self {
67 Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None)
68 }
69}
70
71impl DefaultListFilesCache {
72 pub fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
78 Self {
79 state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)),
80 time_provider: Arc::new(SystemTimeProvider),
81 }
82 }
83
84 #[cfg(test)]
85 pub(crate) fn with_time_provider(mut self, provider: Arc<dyn TimeProvider>) -> Self {
86 self.time_provider = provider;
87 self
88 }
89
90 pub fn cache_limit(&self) -> usize {
92 self.state.lock().unwrap().memory_limit
93 }
94
95 pub fn update_cache_limit(&self, limit: usize) {
97 let mut state = self.state.lock().unwrap();
98 state.memory_limit = limit;
99 state.evict_entries();
100 }
101
102 pub fn cache_ttl(&self) -> Option<Duration> {
104 self.state.lock().unwrap().ttl
105 }
106}
107
108#[derive(Clone, PartialEq, Debug)]
109pub struct ListFilesEntry {
110 pub metas: Arc<Vec<ObjectMeta>>,
111 pub size_bytes: usize,
112 pub expires: Option<Instant>,
113}
114
115impl ListFilesEntry {
116 fn try_new(
117 metas: Arc<Vec<ObjectMeta>>,
118 ttl: Option<Duration>,
119 now: Instant,
120 ) -> Option<Self> {
121 let size_bytes = (metas.capacity() * size_of::<ObjectMeta>())
122 + metas.iter().map(meta_heap_bytes).reduce(|acc, b| acc + b)?;
123
124 Some(Self {
125 metas,
126 size_bytes,
127 expires: ttl.map(|t| now + t),
128 })
129 }
130}
131
132fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {
134 let mut size = object_meta.location.as_ref().len();
135
136 if let Some(e) = &object_meta.e_tag {
137 size += e.len();
138 }
139 if let Some(v) = &object_meta.version {
140 size += v.len();
141 }
142
143 size
144}
145
146pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; pub const DEFAULT_LIST_FILES_CACHE_TTL: Option<Duration> = None; #[derive(PartialEq, Eq, Hash, Clone, Debug)]
153pub struct TableScopedPath {
154 pub table: Option<TableReference>,
155 pub path: Path,
156}
157
158pub struct DefaultListFilesCacheState {
160 lru_queue: LruQueue<TableScopedPath, ListFilesEntry>,
161 memory_limit: usize,
162 memory_used: usize,
163 ttl: Option<Duration>,
164}
165
166impl Default for DefaultListFilesCacheState {
167 fn default() -> Self {
168 Self {
169 lru_queue: LruQueue::new(),
170 memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
171 memory_used: 0,
172 ttl: DEFAULT_LIST_FILES_CACHE_TTL,
173 }
174 }
175}
176
177impl DefaultListFilesCacheState {
178 fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
179 Self {
180 lru_queue: LruQueue::new(),
181 memory_limit,
182 memory_used: 0,
183 ttl,
184 }
185 }
186
187 fn get_with_prefix(
207 &mut self,
208 table_scoped_base_path: &TableScopedPath,
209 prefix: Option<&Path>,
210 now: Instant,
211 ) -> Option<Arc<Vec<ObjectMeta>>> {
212 let entry = self.lru_queue.get(table_scoped_base_path)?;
213
214 if let Some(exp) = entry.expires
216 && now > exp
217 {
218 self.remove(table_scoped_base_path);
219 return None;
220 }
221
222 let Some(prefix) = prefix else {
224 return Some(Arc::clone(&entry.metas));
225 };
226
227 let table_base = &table_scoped_base_path.path;
229 let mut parts: Vec<_> = table_base.parts().collect();
230 parts.extend(prefix.parts());
231 let full_prefix = Path::from_iter(parts);
232 let full_prefix_str = full_prefix.as_ref();
233
234 let filtered: Vec<ObjectMeta> = entry
236 .metas
237 .iter()
238 .filter(|meta| meta.location.as_ref().starts_with(full_prefix_str))
239 .cloned()
240 .collect();
241
242 if filtered.is_empty() {
243 None
244 } else {
245 Some(Arc::new(filtered))
246 }
247 }
248
249 fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool {
255 let Some(entry) = self.lru_queue.peek(k) else {
256 return false;
257 };
258
259 match entry.expires {
260 Some(exp) if now > exp => {
261 self.remove(k);
262 false
263 }
264 _ => true,
265 }
266 }
267
268 fn put(
274 &mut self,
275 key: &TableScopedPath,
276 value: Arc<Vec<ObjectMeta>>,
277 now: Instant,
278 ) -> Option<Arc<Vec<ObjectMeta>>> {
279 let entry = ListFilesEntry::try_new(value, self.ttl, now)?;
280 let entry_size = entry.size_bytes;
281
282 if entry_size > self.memory_limit {
284 return None;
285 }
286
287 let old_value = self.lru_queue.put(key.clone(), entry);
289 self.memory_used += entry_size;
290
291 if let Some(entry) = &old_value {
292 self.memory_used -= entry.size_bytes;
293 }
294
295 self.evict_entries();
296
297 old_value.map(|v| v.metas)
298 }
299
300 fn evict_entries(&mut self) {
302 while self.memory_used > self.memory_limit {
303 if let Some(removed) = self.lru_queue.pop() {
304 self.memory_used -= removed.1.size_bytes;
305 } else {
306 debug_assert!(
308 false,
309 "cache is empty while memory_used > memory_limit, cannot happen"
310 );
311 return;
312 }
313 }
314 }
315
316 fn remove(&mut self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
318 if let Some(entry) = self.lru_queue.remove(k) {
319 self.memory_used -= entry.size_bytes;
320 Some(entry.metas)
321 } else {
322 None
323 }
324 }
325
326 fn len(&self) -> usize {
328 self.lru_queue.len()
329 }
330
331 fn clear(&mut self) {
333 self.lru_queue.clear();
334 self.memory_used = 0;
335 }
336}
337
338impl ListFilesCache for DefaultListFilesCache {
339 fn cache_limit(&self) -> usize {
340 let state = self.state.lock().unwrap();
341 state.memory_limit
342 }
343
344 fn cache_ttl(&self) -> Option<Duration> {
345 let state = self.state.lock().unwrap();
346 state.ttl
347 }
348
349 fn update_cache_limit(&self, limit: usize) {
350 let mut state = self.state.lock().unwrap();
351 state.memory_limit = limit;
352 state.evict_entries();
353 }
354
355 fn update_cache_ttl(&self, ttl: Option<Duration>) {
356 let mut state = self.state.lock().unwrap();
357 state.ttl = ttl;
358 state.evict_entries();
359 }
360
361 fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry> {
362 let state = self.state.lock().unwrap();
363 let mut entries = HashMap::<TableScopedPath, ListFilesEntry>::new();
364 for (path, entry) in state.lru_queue.list_entries() {
365 entries.insert(path.clone(), entry.clone());
366 }
367 entries
368 }
369
370 fn drop_table_entries(
371 &self,
372 table_ref: &Option<TableReference>,
373 ) -> datafusion_common::Result<()> {
374 let mut state = self.state.lock().unwrap();
375 let mut table_paths = vec![];
376 for (path, _) in state.lru_queue.list_entries() {
377 if path.table == *table_ref {
378 table_paths.push(path.clone());
379 }
380 }
381 for path in table_paths {
382 state.remove(&path);
383 }
384 Ok(())
385 }
386}
387
388impl CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
389 type Extra = Option<Path>;
390
391 fn get(&self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
395 self.get_with_extra(k, &None)
396 }
397
398 fn get_with_extra(
412 &self,
413 table_scoped_path: &TableScopedPath,
414 prefix: &Self::Extra,
415 ) -> Option<Arc<Vec<ObjectMeta>>> {
416 let mut state = self.state.lock().unwrap();
417 let now = self.time_provider.now();
418 state.get_with_prefix(table_scoped_path, prefix.as_ref(), now)
419 }
420
421 fn put(
422 &self,
423 key: &TableScopedPath,
424 value: Arc<Vec<ObjectMeta>>,
425 ) -> Option<Arc<Vec<ObjectMeta>>> {
426 let mut state = self.state.lock().unwrap();
427 let now = self.time_provider.now();
428 state.put(key, value, now)
429 }
430
431 fn put_with_extra(
432 &self,
433 key: &TableScopedPath,
434 value: Arc<Vec<ObjectMeta>>,
435 _e: &Self::Extra,
436 ) -> Option<Arc<Vec<ObjectMeta>>> {
437 self.put(key, value)
438 }
439
440 fn remove(&self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
441 let mut state = self.state.lock().unwrap();
442 state.remove(k)
443 }
444
445 fn contains_key(&self, k: &TableScopedPath) -> bool {
446 let mut state = self.state.lock().unwrap();
447 let now = self.time_provider.now();
448 state.contains_key(k, now)
449 }
450
451 fn len(&self) -> usize {
452 let state = self.state.lock().unwrap();
453 state.len()
454 }
455
456 fn clear(&self) {
457 let mut state = self.state.lock().unwrap();
458 state.clear();
459 }
460
461 fn name(&self) -> String {
462 String::from("DefaultListFilesCache")
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469 use chrono::DateTime;
470
471 struct MockTimeProvider {
472 base: Instant,
473 offset: Mutex<Duration>,
474 }
475
476 impl MockTimeProvider {
477 fn new() -> Self {
478 Self {
479 base: Instant::now(),
480 offset: Mutex::new(Duration::ZERO),
481 }
482 }
483
484 fn inc(&self, duration: Duration) {
485 let mut offset = self.offset.lock().unwrap();
486 *offset += duration;
487 }
488 }
489
490 impl TimeProvider for MockTimeProvider {
491 fn now(&self) -> Instant {
492 self.base + *self.offset.lock().unwrap()
493 }
494 }
495
496 fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta {
498 let location_str = if location_size > path.len() {
500 format!("{}{}", path, "0".repeat(location_size - path.len()))
501 } else {
502 path.to_string()
503 };
504
505 ObjectMeta {
506 location: Path::from(location_str),
507 last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
508 .unwrap()
509 .into(),
510 size: 1024,
511 e_tag: None,
512 version: None,
513 }
514 }
515
516 fn create_test_list_files_entry(
518 path: &str,
519 count: usize,
520 meta_size: usize,
521 ) -> (Path, Arc<Vec<ObjectMeta>>, usize) {
522 let metas: Vec<ObjectMeta> = (0..count)
523 .map(|i| create_test_object_meta(&format!("file{i}"), meta_size))
524 .collect();
525 let metas = Arc::new(metas);
526
527 let size = (metas.capacity() * size_of::<ObjectMeta>())
529 + metas.iter().map(meta_heap_bytes).sum::<usize>();
530
531 (Path::from(path), metas, size)
532 }
533
534 #[test]
535 fn test_basic_operations() {
536 let cache = DefaultListFilesCache::default();
537 let table_ref = Some(TableReference::from("table"));
538 let path = Path::from("test_path");
539 let key = TableScopedPath {
540 table: table_ref.clone(),
541 path,
542 };
543
544 assert!(cache.get(&key).is_none());
546 assert!(!cache.contains_key(&key));
547 assert_eq!(cache.len(), 0);
548
549 let meta = create_test_object_meta("file1", 50);
551 let value = Arc::new(vec![meta.clone()]);
552 cache.put(&key, Arc::clone(&value));
553
554 assert!(cache.contains_key(&key));
556 assert_eq!(cache.len(), 1);
557 let retrieved = cache.get(&key).unwrap();
558 assert_eq!(retrieved.len(), 1);
559 assert_eq!(retrieved[0].location, meta.location);
560
561 let removed = cache.remove(&key).unwrap();
563 assert_eq!(removed.len(), 1);
564 assert!(!cache.contains_key(&key));
565 assert_eq!(cache.len(), 0);
566
567 let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
569 let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50);
570 let key1 = TableScopedPath {
571 table: table_ref.clone(),
572 path: path1,
573 };
574 let key2 = TableScopedPath {
575 table: table_ref,
576 path: path2,
577 };
578 cache.put(&key1, Arc::clone(&value1));
579 cache.put(&key2, Arc::clone(&value2));
580 assert_eq!(cache.len(), 2);
581
582 assert_eq!(
584 cache.list_entries(),
585 HashMap::from([
586 (
587 key1.clone(),
588 ListFilesEntry {
589 metas: value1,
590 size_bytes: size1,
591 expires: None,
592 }
593 ),
594 (
595 key2.clone(),
596 ListFilesEntry {
597 metas: value2,
598 size_bytes: size2,
599 expires: None,
600 }
601 )
602 ])
603 );
604
605 cache.clear();
607 assert_eq!(cache.len(), 0);
608 assert!(!cache.contains_key(&key1));
609 assert!(!cache.contains_key(&key2));
610 }
611
612 #[test]
613 fn test_lru_eviction_basic() {
614 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
615 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
616 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
617
618 let cache = DefaultListFilesCache::new(size * 3, None);
620
621 let table_ref = Some(TableReference::from("table"));
622 let key1 = TableScopedPath {
623 table: table_ref.clone(),
624 path: path1,
625 };
626 let key2 = TableScopedPath {
627 table: table_ref.clone(),
628 path: path2,
629 };
630 let key3 = TableScopedPath {
631 table: table_ref.clone(),
632 path: path3,
633 };
634
635 cache.put(&key1, value1);
637 cache.put(&key2, value2);
638 cache.put(&key3, value3);
639 assert_eq!(cache.len(), 3);
640 assert!(cache.contains_key(&key1));
641 assert!(cache.contains_key(&key2));
642 assert!(cache.contains_key(&key3));
643
644 let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
646 let key4 = TableScopedPath {
647 table: table_ref,
648 path: path4,
649 };
650 cache.put(&key4, value4);
651
652 assert_eq!(cache.len(), 3);
653 assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
655 assert!(cache.contains_key(&key3));
656 assert!(cache.contains_key(&key4));
657 }
658
659 #[test]
660 fn test_lru_ordering_after_access() {
661 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
662 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
663 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
664
665 let cache = DefaultListFilesCache::new(size * 3, None);
667
668 let table_ref = Some(TableReference::from("table"));
669 let key1 = TableScopedPath {
670 table: table_ref.clone(),
671 path: path1,
672 };
673 let key2 = TableScopedPath {
674 table: table_ref.clone(),
675 path: path2,
676 };
677 let key3 = TableScopedPath {
678 table: table_ref.clone(),
679 path: path3,
680 };
681
682 cache.put(&key1, value1);
683 cache.put(&key2, value2);
684 cache.put(&key3, value3);
685 assert_eq!(cache.len(), 3);
686
687 cache.get(&key1);
690
691 let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
693 let key4 = TableScopedPath {
694 table: table_ref,
695 path: path4,
696 };
697 cache.put(&key4, value4);
698
699 assert_eq!(cache.len(), 3);
700 assert!(cache.contains_key(&key1)); assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3));
703 assert!(cache.contains_key(&key4));
704 }
705
706 #[test]
707 fn test_reject_too_large() {
708 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
709 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
710
711 let cache = DefaultListFilesCache::new(size * 2, None);
713
714 let table_ref = Some(TableReference::from("table"));
715 let key1 = TableScopedPath {
716 table: table_ref.clone(),
717 path: path1,
718 };
719 let key2 = TableScopedPath {
720 table: table_ref.clone(),
721 path: path2,
722 };
723 cache.put(&key1, value1);
724 cache.put(&key2, value2);
725 assert_eq!(cache.len(), 2);
726
727 let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000);
729 let key_large = TableScopedPath {
730 table: table_ref,
731 path: path_large,
732 };
733 cache.put(&key_large, value_large);
734
735 assert!(!cache.contains_key(&key_large));
737 assert_eq!(cache.len(), 2);
738 assert!(cache.contains_key(&key1));
739 assert!(cache.contains_key(&key2));
740 }
741
742 #[test]
743 fn test_multiple_evictions() {
744 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
745 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
746 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
747
748 let cache = DefaultListFilesCache::new(size * 3, None);
750
751 let table_ref = Some(TableReference::from("table"));
752 let key1 = TableScopedPath {
753 table: table_ref.clone(),
754 path: path1,
755 };
756 let key2 = TableScopedPath {
757 table: table_ref.clone(),
758 path: path2,
759 };
760 let key3 = TableScopedPath {
761 table: table_ref.clone(),
762 path: path3,
763 };
764 cache.put(&key1, value1);
765 cache.put(&key2, value2);
766 cache.put(&key3, value3);
767 assert_eq!(cache.len(), 3);
768
769 let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 200);
771 let key_large = TableScopedPath {
772 table: table_ref,
773 path: path_large,
774 };
775 cache.put(&key_large, value_large);
776
777 assert_eq!(cache.len(), 2);
779 assert!(!cache.contains_key(&key1)); assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3));
782 assert!(cache.contains_key(&key_large));
783 }
784
785 #[test]
786 fn test_cache_limit_resize() {
787 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
788 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
789 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
790
791 let cache = DefaultListFilesCache::new(size * 3, None);
792
793 let table_ref = Some(TableReference::from("table"));
794 let key1 = TableScopedPath {
795 table: table_ref.clone(),
796 path: path1,
797 };
798 let key2 = TableScopedPath {
799 table: table_ref.clone(),
800 path: path2,
801 };
802 let key3 = TableScopedPath {
803 table: table_ref,
804 path: path3,
805 };
806 cache.put(&key1, value1);
808 cache.put(&key2, value2);
809 cache.put(&key3, value3);
810 assert_eq!(cache.len(), 3);
811
812 cache.update_cache_limit(size);
814
815 assert_eq!(cache.len(), 1);
817 assert!(cache.contains_key(&key3));
818 assert!(!cache.contains_key(&key1));
820 assert!(!cache.contains_key(&key2));
821 }
822
823 #[test]
824 fn test_entry_update_with_size_change() {
825 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
826 let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 100);
827 let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 100);
828
829 let cache = DefaultListFilesCache::new(size * 3, None);
830
831 let table_ref = Some(TableReference::from("table"));
832 let key1 = TableScopedPath {
833 table: table_ref.clone(),
834 path: path1,
835 };
836 let key2 = TableScopedPath {
837 table: table_ref.clone(),
838 path: path2,
839 };
840 let key3 = TableScopedPath {
841 table: table_ref,
842 path: path3,
843 };
844 cache.put(&key1, value1);
846 cache.put(&key2, Arc::clone(&value2));
847 cache.put(&key3, value3_v1);
848 assert_eq!(cache.len(), 3);
849
850 let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100);
852 cache.put(&key3, value3_v2);
853
854 assert_eq!(cache.len(), 3);
855 assert!(cache.contains_key(&key1));
856 assert!(cache.contains_key(&key2));
857 assert!(cache.contains_key(&key3));
858
859 let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200);
861 cache.put(&key3, Arc::clone(&value3_v3));
862
863 assert_eq!(cache.len(), 2);
864 assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
866 assert!(cache.contains_key(&key3));
867
868 assert_eq!(
870 cache.list_entries(),
871 HashMap::from([
872 (
873 key2,
874 ListFilesEntry {
875 metas: value2,
876 size_bytes: size2,
877 expires: None,
878 }
879 ),
880 (
881 key3,
882 ListFilesEntry {
883 metas: value3_v3,
884 size_bytes: size3_v3,
885 expires: None,
886 }
887 )
888 ])
889 );
890 }
891
892 #[test]
893 fn test_cache_with_ttl() {
894 let ttl = Duration::from_millis(100);
895
896 let mock_time = Arc::new(MockTimeProvider::new());
897 let cache = DefaultListFilesCache::new(10000, Some(ttl))
898 .with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
899
900 let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
901 let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50);
902
903 let table_ref = Some(TableReference::from("table"));
904 let key1 = TableScopedPath {
905 table: table_ref.clone(),
906 path: path1,
907 };
908 let key2 = TableScopedPath {
909 table: table_ref,
910 path: path2,
911 };
912 cache.put(&key1, Arc::clone(&value1));
913 cache.put(&key2, Arc::clone(&value2));
914
915 assert!(cache.get(&key1).is_some());
917 assert!(cache.get(&key2).is_some());
918 assert_eq!(
920 cache.list_entries(),
921 HashMap::from([
922 (
923 key1.clone(),
924 ListFilesEntry {
925 metas: value1,
926 size_bytes: size1,
927 expires: mock_time.now().checked_add(ttl),
928 }
929 ),
930 (
931 key2.clone(),
932 ListFilesEntry {
933 metas: value2,
934 size_bytes: size2,
935 expires: mock_time.now().checked_add(ttl),
936 }
937 )
938 ])
939 );
940 mock_time.inc(Duration::from_millis(150));
942
943 assert!(cache.get(&key1).is_none());
945 assert_eq!(cache.len(), 1); assert!(!cache.contains_key(&key2));
947 assert_eq!(cache.len(), 0); }
949
950 #[test]
951 fn test_cache_with_ttl_and_lru() {
952 let ttl = Duration::from_millis(200);
953
954 let mock_time = Arc::new(MockTimeProvider::new());
955 let cache = DefaultListFilesCache::new(1000, Some(ttl))
956 .with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
957
958 let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400);
959 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400);
960 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400);
961
962 let table_ref = Some(TableReference::from("table"));
963 let key1 = TableScopedPath {
964 table: table_ref.clone(),
965 path: path1,
966 };
967 let key2 = TableScopedPath {
968 table: table_ref.clone(),
969 path: path2,
970 };
971 let key3 = TableScopedPath {
972 table: table_ref,
973 path: path3,
974 };
975 cache.put(&key1, value1);
976 mock_time.inc(Duration::from_millis(50));
977 cache.put(&key2, value2);
978 mock_time.inc(Duration::from_millis(50));
979
980 cache.put(&key3, value3);
982 assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
984 assert!(cache.contains_key(&key3));
985
986 mock_time.inc(Duration::from_millis(151));
987
988 assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3)); }
991
992 #[test]
993 fn test_meta_heap_bytes_calculation() {
994 let meta1 = ObjectMeta {
996 location: Path::from("test"),
997 last_modified: chrono::Utc::now(),
998 size: 100,
999 e_tag: None,
1000 version: None,
1001 };
1002 assert_eq!(meta_heap_bytes(&meta1), 4); let meta2 = ObjectMeta {
1006 location: Path::from("test"),
1007 last_modified: chrono::Utc::now(),
1008 size: 100,
1009 e_tag: Some("etag123".to_string()),
1010 version: None,
1011 };
1012 assert_eq!(meta_heap_bytes(&meta2), 4 + 7); let meta3 = ObjectMeta {
1016 location: Path::from("test"),
1017 last_modified: chrono::Utc::now(),
1018 size: 100,
1019 e_tag: None,
1020 version: Some("v1.0".to_string()),
1021 };
1022 assert_eq!(meta_heap_bytes(&meta3), 4 + 4); let meta4 = ObjectMeta {
1026 location: Path::from("test"),
1027 last_modified: chrono::Utc::now(),
1028 size: 100,
1029 e_tag: Some("tag".to_string()),
1030 version: Some("ver".to_string()),
1031 };
1032 assert_eq!(meta_heap_bytes(&meta4), 4 + 3 + 3); }
1034
1035 #[test]
1036 fn test_entry_creation() {
1037 let empty_vec: Arc<Vec<ObjectMeta>> = Arc::new(vec![]);
1039 let now = Instant::now();
1040 let entry = ListFilesEntry::try_new(empty_vec, None, now);
1041 assert!(entry.is_none());
1042
1043 let metas: Vec<ObjectMeta> = (0..5)
1045 .map(|i| create_test_object_meta(&format!("file{i}"), 30))
1046 .collect();
1047 let metas = Arc::new(metas);
1048 let entry = ListFilesEntry::try_new(metas, None, now).unwrap();
1049 assert_eq!(entry.metas.len(), 5);
1050 let expected_size =
1052 (entry.metas.capacity() * size_of::<ObjectMeta>()) + (entry.metas.len() * 30);
1053 assert_eq!(entry.size_bytes, expected_size);
1054
1055 let meta = create_test_object_meta("file", 50);
1057 let ttl = Duration::from_secs(10);
1058 let entry =
1059 ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl), now).unwrap();
1060 assert!(entry.expires.unwrap() > now);
1061 }
1062
1063 #[test]
1064 fn test_memory_tracking() {
1065 let cache = DefaultListFilesCache::new(1000, None);
1066
1067 {
1069 let state = cache.state.lock().unwrap();
1070 assert_eq!(state.memory_used, 0);
1071 }
1072
1073 let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 100);
1075 let table_ref = Some(TableReference::from("table"));
1076 let key1 = TableScopedPath {
1077 table: table_ref.clone(),
1078 path: path1,
1079 };
1080 cache.put(&key1, value1);
1081 {
1082 let state = cache.state.lock().unwrap();
1083 assert_eq!(state.memory_used, size1);
1084 }
1085
1086 let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 200);
1088 let key2 = TableScopedPath {
1089 table: table_ref.clone(),
1090 path: path2,
1091 };
1092 cache.put(&key2, value2);
1093 {
1094 let state = cache.state.lock().unwrap();
1095 assert_eq!(state.memory_used, size1 + size2);
1096 }
1097
1098 cache.remove(&key1);
1100 {
1101 let state = cache.state.lock().unwrap();
1102 assert_eq!(state.memory_used, size2);
1103 }
1104
1105 cache.clear();
1107 {
1108 let state = cache.state.lock().unwrap();
1109 assert_eq!(state.memory_used, 0);
1110 }
1111 }
1112
1113 fn create_object_meta_with_path(location: &str) -> ObjectMeta {
1117 ObjectMeta {
1118 location: Path::from(location),
1119 last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
1120 .unwrap()
1121 .into(),
1122 size: 1024,
1123 e_tag: None,
1124 version: None,
1125 }
1126 }
1127
1128 #[test]
1129 fn test_prefix_aware_cache_hit() {
1130 let cache = DefaultListFilesCache::new(100000, None);
1132
1133 let table_base = Path::from("my_table");
1135 let files = Arc::new(vec![
1136 create_object_meta_with_path("my_table/a=1/file1.parquet"),
1137 create_object_meta_with_path("my_table/a=1/file2.parquet"),
1138 create_object_meta_with_path("my_table/a=2/file3.parquet"),
1139 create_object_meta_with_path("my_table/a=2/file4.parquet"),
1140 ]);
1141
1142 let table_ref = Some(TableReference::from("table"));
1144 let key = TableScopedPath {
1145 table: table_ref,
1146 path: table_base,
1147 };
1148 cache.put(&key, files);
1149
1150 let prefix_a1 = Some(Path::from("a=1"));
1153 let result = cache.get_with_extra(&key, &prefix_a1);
1154
1155 assert!(result.is_some());
1157 let filtered = result.unwrap();
1158 assert_eq!(filtered.len(), 2);
1159 assert!(
1160 filtered
1161 .iter()
1162 .all(|m| m.location.as_ref().starts_with("my_table/a=1"))
1163 );
1164
1165 let prefix_a2 = Some(Path::from("a=2"));
1167 let result_2 = cache.get_with_extra(&key, &prefix_a2);
1168
1169 assert!(result_2.is_some());
1170 let filtered_2 = result_2.unwrap();
1171 assert_eq!(filtered_2.len(), 2);
1172 assert!(
1173 filtered_2
1174 .iter()
1175 .all(|m| m.location.as_ref().starts_with("my_table/a=2"))
1176 );
1177 }
1178
1179 #[test]
1180 fn test_prefix_aware_cache_no_filter_returns_all() {
1181 let cache = DefaultListFilesCache::new(100000, None);
1183
1184 let table_base = Path::from("my_table");
1185
1186 let full_files = Arc::new(vec![
1188 create_object_meta_with_path("my_table/a=1/file1.parquet"),
1189 create_object_meta_with_path("my_table/a=1/file2.parquet"),
1190 create_object_meta_with_path("my_table/a=2/file3.parquet"),
1191 create_object_meta_with_path("my_table/a=2/file4.parquet"),
1192 ]);
1193 let table_ref = Some(TableReference::from("table"));
1194 let key = TableScopedPath {
1195 table: table_ref,
1196 path: table_base,
1197 };
1198 cache.put(&key, full_files);
1199
1200 let result = cache.get_with_extra(&key, &None);
1202 assert!(result.is_some());
1203 let files = result.unwrap();
1204 assert_eq!(files.len(), 4);
1205
1206 let result_get = cache.get(&key);
1208 assert!(result_get.is_some());
1209 assert_eq!(result_get.unwrap().len(), 4);
1210 }
1211
1212 #[test]
1213 fn test_prefix_aware_cache_miss_no_entry() {
1214 let cache = DefaultListFilesCache::new(100000, None);
1216
1217 let table_base = Path::from("my_table");
1218 let table_ref = Some(TableReference::from("table"));
1219 let key = TableScopedPath {
1220 table: table_ref,
1221 path: table_base,
1222 };
1223
1224 let result = cache.get_with_extra(&key, &None);
1226 assert!(result.is_none());
1227
1228 let prefix = Some(Path::from("a=1"));
1230 let result_2 = cache.get_with_extra(&key, &prefix);
1231 assert!(result_2.is_none());
1232 }
1233
1234 #[test]
1235 fn test_prefix_aware_cache_no_matching_files() {
1236 let cache = DefaultListFilesCache::new(100000, None);
1238
1239 let table_base = Path::from("my_table");
1240 let files = Arc::new(vec![
1241 create_object_meta_with_path("my_table/a=1/file1.parquet"),
1242 create_object_meta_with_path("my_table/a=2/file2.parquet"),
1243 ]);
1244 let table_ref = Some(TableReference::from("table"));
1245 let key = TableScopedPath {
1246 table: table_ref,
1247 path: table_base,
1248 };
1249 cache.put(&key, files);
1250
1251 let prefix_a3 = Some(Path::from("a=3"));
1253 let result = cache.get_with_extra(&key, &prefix_a3);
1254
1255 assert!(result.is_none());
1257 }
1258
1259 #[test]
1260 fn test_prefix_aware_nested_partitions() {
1261 let cache = DefaultListFilesCache::new(100000, None);
1263
1264 let table_base = Path::from("events");
1265 let files = Arc::new(vec![
1266 create_object_meta_with_path(
1267 "events/year=2024/month=01/day=01/file1.parquet",
1268 ),
1269 create_object_meta_with_path(
1270 "events/year=2024/month=01/day=02/file2.parquet",
1271 ),
1272 create_object_meta_with_path(
1273 "events/year=2024/month=02/day=01/file3.parquet",
1274 ),
1275 create_object_meta_with_path(
1276 "events/year=2025/month=01/day=01/file4.parquet",
1277 ),
1278 ]);
1279 let table_ref = Some(TableReference::from("table"));
1280 let key = TableScopedPath {
1281 table: table_ref,
1282 path: table_base,
1283 };
1284 cache.put(&key, files);
1285
1286 let prefix_month = Some(Path::from("year=2024/month=01"));
1288 let result = cache.get_with_extra(&key, &prefix_month);
1289 assert!(result.is_some());
1290 assert_eq!(result.unwrap().len(), 2);
1291
1292 let prefix_year = Some(Path::from("year=2024"));
1294 let result_year = cache.get_with_extra(&key, &prefix_year);
1295 assert!(result_year.is_some());
1296 assert_eq!(result_year.unwrap().len(), 3);
1297
1298 let prefix_day = Some(Path::from("year=2024/month=01/day=01"));
1300 let result_day = cache.get_with_extra(&key, &prefix_day);
1301 assert!(result_day.is_some());
1302 assert_eq!(result_day.unwrap().len(), 1);
1303 }
1304
1305 #[test]
1306 fn test_prefix_aware_different_tables() {
1307 let cache = DefaultListFilesCache::new(100000, None);
1309
1310 let table_a = Path::from("table_a");
1311 let table_b = Path::from("table_b");
1312
1313 let files_a = Arc::new(vec![create_object_meta_with_path(
1314 "table_a/part=1/file1.parquet",
1315 )]);
1316 let files_b = Arc::new(vec![
1317 create_object_meta_with_path("table_b/part=1/file1.parquet"),
1318 create_object_meta_with_path("table_b/part=2/file2.parquet"),
1319 ]);
1320
1321 let table_ref_a = Some(TableReference::from("table_a"));
1322 let table_ref_b = Some(TableReference::from("table_b"));
1323 let key_a = TableScopedPath {
1324 table: table_ref_a,
1325 path: table_a,
1326 };
1327 let key_b = TableScopedPath {
1328 table: table_ref_b,
1329 path: table_b,
1330 };
1331 cache.put(&key_a, files_a);
1332 cache.put(&key_b, files_b);
1333
1334 let result_a = cache.get(&key_a);
1336 assert!(result_a.is_some());
1337 assert_eq!(result_a.unwrap().len(), 1);
1338
1339 let prefix = Some(Path::from("part=1"));
1341 let result_b = cache.get_with_extra(&key_b, &prefix);
1342 assert!(result_b.is_some());
1343 assert_eq!(result_b.unwrap().len(), 1);
1344 }
1345
1346 #[test]
1347 fn test_drop_table_entries() {
1348 let cache = DefaultListFilesCache::default();
1349
1350 let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100);
1351 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
1352 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
1353
1354 let table_ref1 = Some(TableReference::from("table1"));
1355 let key1 = TableScopedPath {
1356 table: table_ref1.clone(),
1357 path: path1,
1358 };
1359 let key2 = TableScopedPath {
1360 table: table_ref1.clone(),
1361 path: path2,
1362 };
1363
1364 let table_ref2 = Some(TableReference::from("table2"));
1365 let key3 = TableScopedPath {
1366 table: table_ref2.clone(),
1367 path: path3,
1368 };
1369
1370 cache.put(&key1, value1);
1371 cache.put(&key2, value2);
1372 cache.put(&key3, value3);
1373
1374 cache.drop_table_entries(&table_ref1).unwrap();
1375
1376 assert!(!cache.contains_key(&key1));
1377 assert!(!cache.contains_key(&key2));
1378 assert!(cache.contains_key(&key3));
1379 }
1380}