1use std::mem::size_of;
19use std::{
20 collections::HashMap,
21 sync::{Arc, Mutex},
22 time::Duration,
23};
24
25use datafusion_common::TableReference;
26use datafusion_common::instant::Instant;
27use object_store::{ObjectMeta, path::Path};
28
29use crate::cache::{
30 CacheAccessor,
31 cache_manager::{CachedFileList, ListFilesCache},
32 lru_queue::LruQueue,
33};
34
35pub trait TimeProvider: Send + Sync + 'static {
36 fn now(&self) -> Instant;
37}
38
39#[derive(Debug, Default)]
40pub struct SystemTimeProvider;
41
42impl TimeProvider for SystemTimeProvider {
43 fn now(&self) -> Instant {
44 Instant::now()
45 }
46}
47
48pub struct DefaultListFilesCache {
64 state: Mutex<DefaultListFilesCacheState>,
65 time_provider: Arc<dyn TimeProvider>,
66}
67
68impl Default for DefaultListFilesCache {
69 fn default() -> Self {
70 Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None)
71 }
72}
73
74impl DefaultListFilesCache {
75 pub fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
81 Self {
82 state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)),
83 time_provider: Arc::new(SystemTimeProvider),
84 }
85 }
86
87 #[cfg(test)]
88 pub(crate) fn with_time_provider(mut self, provider: Arc<dyn TimeProvider>) -> Self {
89 self.time_provider = provider;
90 self
91 }
92}
93
94#[derive(Clone, PartialEq, Debug)]
95pub struct ListFilesEntry {
96 pub metas: CachedFileList,
97 pub size_bytes: usize,
98 pub expires: Option<Instant>,
99}
100
101impl ListFilesEntry {
102 fn try_new(
103 cached_file_list: CachedFileList,
104 ttl: Option<Duration>,
105 now: Instant,
106 ) -> Option<Self> {
107 let size_bytes = (cached_file_list.files.capacity() * size_of::<ObjectMeta>())
108 + cached_file_list
109 .files
110 .iter()
111 .map(meta_heap_bytes)
112 .reduce(|acc, b| acc + b)?;
113
114 Some(Self {
115 metas: cached_file_list,
116 size_bytes,
117 expires: ttl.map(|t| now + t),
118 })
119 }
120}
121
122fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {
124 let mut size = object_meta.location.as_ref().len();
125
126 if let Some(e) = &object_meta.e_tag {
127 size += e.len();
128 }
129 if let Some(v) = &object_meta.version {
130 size += v.len();
131 }
132
133 size
134}
135
136pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; pub const DEFAULT_LIST_FILES_CACHE_TTL: Option<Duration> = None; #[derive(PartialEq, Eq, Hash, Clone, Debug)]
148pub struct TableScopedPath {
149 pub table: Option<TableReference>,
150 pub path: Path,
151}
152
153pub struct DefaultListFilesCacheState {
155 lru_queue: LruQueue<TableScopedPath, ListFilesEntry>,
156 memory_limit: usize,
157 memory_used: usize,
158 ttl: Option<Duration>,
159}
160
161impl Default for DefaultListFilesCacheState {
162 fn default() -> Self {
163 Self {
164 lru_queue: LruQueue::new(),
165 memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
166 memory_used: 0,
167 ttl: DEFAULT_LIST_FILES_CACHE_TTL,
168 }
169 }
170}
171
172impl DefaultListFilesCacheState {
173 fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
174 Self {
175 lru_queue: LruQueue::new(),
176 memory_limit,
177 memory_used: 0,
178 ttl,
179 }
180 }
181
182 fn get(&mut self, key: &TableScopedPath, now: Instant) -> Option<CachedFileList> {
187 let entry = self.lru_queue.get(key)?;
188
189 if let Some(exp) = entry.expires
191 && now > exp
192 {
193 self.remove(key);
194 return None;
195 }
196
197 Some(entry.metas.clone())
198 }
199
200 fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool {
206 let Some(entry) = self.lru_queue.peek(k) else {
207 return false;
208 };
209
210 match entry.expires {
211 Some(exp) if now > exp => {
212 self.remove(k);
213 false
214 }
215 _ => true,
216 }
217 }
218
219 fn put(
225 &mut self,
226 key: &TableScopedPath,
227 value: CachedFileList,
228 now: Instant,
229 ) -> Option<CachedFileList> {
230 let entry = ListFilesEntry::try_new(value, self.ttl, now)?;
231 let entry_size = entry.size_bytes;
232
233 if entry_size > self.memory_limit {
235 return None;
236 }
237
238 let old_value = self.lru_queue.put(key.clone(), entry);
240 self.memory_used += entry_size;
241
242 if let Some(entry) = &old_value {
243 self.memory_used -= entry.size_bytes;
244 }
245
246 self.evict_entries();
247
248 old_value.map(|v| v.metas)
249 }
250
251 fn evict_entries(&mut self) {
253 while self.memory_used > self.memory_limit {
254 if let Some(removed) = self.lru_queue.pop() {
255 self.memory_used -= removed.1.size_bytes;
256 } else {
257 debug_assert!(
259 false,
260 "cache is empty while memory_used > memory_limit, cannot happen"
261 );
262 return;
263 }
264 }
265 }
266
267 fn remove(&mut self, k: &TableScopedPath) -> Option<CachedFileList> {
269 if let Some(entry) = self.lru_queue.remove(k) {
270 self.memory_used -= entry.size_bytes;
271 Some(entry.metas)
272 } else {
273 None
274 }
275 }
276
277 fn len(&self) -> usize {
279 self.lru_queue.len()
280 }
281
282 fn clear(&mut self) {
284 self.lru_queue.clear();
285 self.memory_used = 0;
286 }
287}
288
289impl CacheAccessor<TableScopedPath, CachedFileList> for DefaultListFilesCache {
290 fn get(&self, key: &TableScopedPath) -> Option<CachedFileList> {
291 let mut state = self.state.lock().unwrap();
292 let now = self.time_provider.now();
293 state.get(key, now)
294 }
295
296 fn put(
297 &self,
298 key: &TableScopedPath,
299 value: CachedFileList,
300 ) -> Option<CachedFileList> {
301 let mut state = self.state.lock().unwrap();
302 let now = self.time_provider.now();
303 state.put(key, value, now)
304 }
305
306 fn remove(&self, k: &TableScopedPath) -> Option<CachedFileList> {
307 let mut state = self.state.lock().unwrap();
308 state.remove(k)
309 }
310
311 fn contains_key(&self, k: &TableScopedPath) -> bool {
312 let mut state = self.state.lock().unwrap();
313 let now = self.time_provider.now();
314 state.contains_key(k, now)
315 }
316
317 fn len(&self) -> usize {
318 let state = self.state.lock().unwrap();
319 state.len()
320 }
321
322 fn clear(&self) {
323 let mut state = self.state.lock().unwrap();
324 state.clear();
325 }
326
327 fn name(&self) -> String {
328 String::from("DefaultListFilesCache")
329 }
330}
331
332impl ListFilesCache for DefaultListFilesCache {
333 fn cache_limit(&self) -> usize {
334 let state = self.state.lock().unwrap();
335 state.memory_limit
336 }
337
338 fn cache_ttl(&self) -> Option<Duration> {
339 let state = self.state.lock().unwrap();
340 state.ttl
341 }
342
343 fn update_cache_limit(&self, limit: usize) {
344 let mut state = self.state.lock().unwrap();
345 state.memory_limit = limit;
346 state.evict_entries();
347 }
348
349 fn update_cache_ttl(&self, ttl: Option<Duration>) {
350 let mut state = self.state.lock().unwrap();
351 state.ttl = ttl;
352 state.evict_entries();
353 }
354
355 fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry> {
356 let state = self.state.lock().unwrap();
357 let mut entries = HashMap::<TableScopedPath, ListFilesEntry>::new();
358 for (path, entry) in state.lru_queue.list_entries() {
359 entries.insert(path.clone(), entry.clone());
360 }
361 entries
362 }
363
364 fn drop_table_entries(
365 &self,
366 table_ref: &Option<TableReference>,
367 ) -> datafusion_common::Result<()> {
368 let mut state = self.state.lock().unwrap();
369 let mut table_paths = vec![];
370 for (path, _) in state.lru_queue.list_entries() {
371 if path.table == *table_ref {
372 table_paths.push(path.clone());
373 }
374 }
375 for path in table_paths {
376 state.remove(&path);
377 }
378 Ok(())
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385 use chrono::DateTime;
386 use std::thread;
387
388 struct MockTimeProvider {
389 base: Instant,
390 offset: Mutex<Duration>,
391 }
392
393 impl MockTimeProvider {
394 fn new() -> Self {
395 Self {
396 base: Instant::now(),
397 offset: Mutex::new(Duration::ZERO),
398 }
399 }
400
401 fn inc(&self, duration: Duration) {
402 let mut offset = self.offset.lock().unwrap();
403 *offset += duration;
404 }
405 }
406
407 impl TimeProvider for MockTimeProvider {
408 fn now(&self) -> Instant {
409 self.base + *self.offset.lock().unwrap()
410 }
411 }
412
413 fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta {
415 let location_str = if location_size > path.len() {
417 format!("{}{}", path, "0".repeat(location_size - path.len()))
418 } else {
419 path.to_string()
420 };
421
422 ObjectMeta {
423 location: Path::from(location_str),
424 last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
425 .unwrap()
426 .into(),
427 size: 1024,
428 e_tag: None,
429 version: None,
430 }
431 }
432
433 fn create_test_list_files_entry(
435 path: &str,
436 count: usize,
437 meta_size: usize,
438 ) -> (Path, CachedFileList, usize) {
439 let metas: Vec<ObjectMeta> = (0..count)
440 .map(|i| create_test_object_meta(&format!("file{i}"), meta_size))
441 .collect();
442
443 let size = (metas.capacity() * size_of::<ObjectMeta>())
445 + metas.iter().map(meta_heap_bytes).sum::<usize>();
446
447 (Path::from(path), CachedFileList::new(metas), size)
448 }
449
450 #[test]
451 fn test_basic_operations() {
452 let cache = DefaultListFilesCache::default();
453 let table_ref = Some(TableReference::from("table"));
454 let path = Path::from("test_path");
455 let key = TableScopedPath {
456 table: table_ref.clone(),
457 path,
458 };
459
460 assert!(!cache.contains_key(&key));
462 assert_eq!(cache.len(), 0);
463
464 assert!(cache.get(&key).is_none());
466
467 let meta = create_test_object_meta("file1", 50);
469 cache.put(&key, CachedFileList::new(vec![meta]));
470
471 assert!(cache.contains_key(&key));
473 assert_eq!(cache.len(), 1);
474 let result = cache.get(&key).unwrap();
475 assert_eq!(result.files.len(), 1);
476
477 let removed = cache.remove(&key).unwrap();
479 assert_eq!(removed.files.len(), 1);
480 assert!(!cache.contains_key(&key));
481 assert_eq!(cache.len(), 0);
482
483 let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
485 let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50);
486 let key1 = TableScopedPath {
487 table: table_ref.clone(),
488 path: path1,
489 };
490 let key2 = TableScopedPath {
491 table: table_ref,
492 path: path2,
493 };
494 cache.put(&key1, value1.clone());
495 cache.put(&key2, value2.clone());
496 assert_eq!(cache.len(), 2);
497
498 assert_eq!(
500 cache.list_entries(),
501 HashMap::from([
502 (
503 key1.clone(),
504 ListFilesEntry {
505 metas: value1,
506 size_bytes: size1,
507 expires: None,
508 }
509 ),
510 (
511 key2.clone(),
512 ListFilesEntry {
513 metas: value2,
514 size_bytes: size2,
515 expires: None,
516 }
517 )
518 ])
519 );
520
521 cache.clear();
523 assert_eq!(cache.len(), 0);
524 assert!(!cache.contains_key(&key1));
525 assert!(!cache.contains_key(&key2));
526 }
527
528 #[test]
529 fn test_lru_eviction_basic() {
530 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
531 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
532 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
533
534 let cache = DefaultListFilesCache::new(size * 3, None);
536
537 let table_ref = Some(TableReference::from("table"));
538 let key1 = TableScopedPath {
539 table: table_ref.clone(),
540 path: path1,
541 };
542 let key2 = TableScopedPath {
543 table: table_ref.clone(),
544 path: path2,
545 };
546 let key3 = TableScopedPath {
547 table: table_ref.clone(),
548 path: path3,
549 };
550
551 cache.put(&key1, value1);
553 cache.put(&key2, value2);
554 cache.put(&key3, value3);
555 assert_eq!(cache.len(), 3);
556 assert!(cache.contains_key(&key1));
557 assert!(cache.contains_key(&key2));
558 assert!(cache.contains_key(&key3));
559
560 let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
562 let key4 = TableScopedPath {
563 table: table_ref,
564 path: path4,
565 };
566 cache.put(&key4, value4);
567
568 assert_eq!(cache.len(), 3);
569 assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
571 assert!(cache.contains_key(&key3));
572 assert!(cache.contains_key(&key4));
573 }
574
575 #[test]
576 fn test_lru_ordering_after_access() {
577 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
578 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
579 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
580
581 let cache = DefaultListFilesCache::new(size * 3, None);
583
584 let table_ref = Some(TableReference::from("table"));
585 let key1 = TableScopedPath {
586 table: table_ref.clone(),
587 path: path1,
588 };
589 let key2 = TableScopedPath {
590 table: table_ref.clone(),
591 path: path2,
592 };
593 let key3 = TableScopedPath {
594 table: table_ref.clone(),
595 path: path3,
596 };
597
598 cache.put(&key1, value1);
599 cache.put(&key2, value2);
600 cache.put(&key3, value3);
601 assert_eq!(cache.len(), 3);
602
603 let _ = cache.get(&key1);
606
607 let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
609 let key4 = TableScopedPath {
610 table: table_ref,
611 path: path4,
612 };
613 cache.put(&key4, value4);
614
615 assert_eq!(cache.len(), 3);
616 assert!(cache.contains_key(&key1)); assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3));
619 assert!(cache.contains_key(&key4));
620 }
621
622 #[test]
623 fn test_reject_too_large() {
624 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
625 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
626
627 let cache = DefaultListFilesCache::new(size * 2, None);
629
630 let table_ref = Some(TableReference::from("table"));
631 let key1 = TableScopedPath {
632 table: table_ref.clone(),
633 path: path1,
634 };
635 let key2 = TableScopedPath {
636 table: table_ref.clone(),
637 path: path2,
638 };
639 cache.put(&key1, value1);
640 cache.put(&key2, value2);
641 assert_eq!(cache.len(), 2);
642
643 let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000);
646 let key_large = TableScopedPath {
647 table: table_ref,
648 path: path_large,
649 };
650 cache.put(&key_large, value_large);
651
652 assert!(!cache.contains_key(&key_large));
654 assert_eq!(cache.len(), 2);
655 assert!(cache.contains_key(&key1));
656 assert!(cache.contains_key(&key2));
657 }
658
659 #[test]
660 fn test_multiple_evictions() {
661 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
662 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
663 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
664
665 let cache = DefaultListFilesCache::new(size * 3, None);
667
668 let table_ref = Some(TableReference::from("table"));
669 let key1 = TableScopedPath {
670 table: table_ref.clone(),
671 path: path1,
672 };
673 let key2 = TableScopedPath {
674 table: table_ref.clone(),
675 path: path2,
676 };
677 let key3 = TableScopedPath {
678 table: table_ref.clone(),
679 path: path3,
680 };
681 cache.put(&key1, value1);
682 cache.put(&key2, value2);
683 cache.put(&key3, value3);
684 assert_eq!(cache.len(), 3);
685
686 let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 200);
688 let key_large = TableScopedPath {
689 table: table_ref,
690 path: path_large,
691 };
692 cache.put(&key_large, value_large);
693
694 assert_eq!(cache.len(), 2);
696 assert!(!cache.contains_key(&key1)); assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3));
699 assert!(cache.contains_key(&key_large));
700 }
701
702 #[test]
703 fn test_cache_limit_resize() {
704 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
705 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
706 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
707
708 let cache = DefaultListFilesCache::new(size * 3, None);
709
710 let table_ref = Some(TableReference::from("table"));
711 let key1 = TableScopedPath {
712 table: table_ref.clone(),
713 path: path1,
714 };
715 let key2 = TableScopedPath {
716 table: table_ref.clone(),
717 path: path2,
718 };
719 let key3 = TableScopedPath {
720 table: table_ref,
721 path: path3,
722 };
723 cache.put(&key1, value1);
725 cache.put(&key2, value2);
726 cache.put(&key3, value3);
727 assert_eq!(cache.len(), 3);
728
729 cache.update_cache_limit(size);
731
732 assert_eq!(cache.len(), 1);
734 assert!(cache.contains_key(&key3));
735 assert!(!cache.contains_key(&key1));
737 assert!(!cache.contains_key(&key2));
738 }
739
740 #[test]
741 fn test_entry_update_with_size_change() {
742 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
743 let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 100);
744 let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 100);
745
746 let cache = DefaultListFilesCache::new(size * 3, None);
747
748 let table_ref = Some(TableReference::from("table"));
749 let key1 = TableScopedPath {
750 table: table_ref.clone(),
751 path: path1,
752 };
753 let key2 = TableScopedPath {
754 table: table_ref.clone(),
755 path: path2,
756 };
757 let key3 = TableScopedPath {
758 table: table_ref,
759 path: path3,
760 };
761 cache.put(&key1, value1);
763 cache.put(&key2, value2.clone());
764 cache.put(&key3, value3_v1);
765 assert_eq!(cache.len(), 3);
766
767 let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100);
769 cache.put(&key3, value3_v2);
770
771 assert_eq!(cache.len(), 3);
772 assert!(cache.contains_key(&key1));
773 assert!(cache.contains_key(&key2));
774 assert!(cache.contains_key(&key3));
775
776 let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200);
778 cache.put(&key3, value3_v3.clone());
779
780 assert_eq!(cache.len(), 2);
781 assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
783 assert!(cache.contains_key(&key3));
784
785 assert_eq!(
787 cache.list_entries(),
788 HashMap::from([
789 (
790 key2,
791 ListFilesEntry {
792 metas: value2,
793 size_bytes: size2,
794 expires: None,
795 }
796 ),
797 (
798 key3,
799 ListFilesEntry {
800 metas: value3_v3,
801 size_bytes: size3_v3,
802 expires: None,
803 }
804 )
805 ])
806 );
807 }
808
809 #[test]
810 fn test_cache_with_ttl() {
811 let ttl = Duration::from_millis(100);
812
813 let mock_time = Arc::new(MockTimeProvider::new());
814 let cache = DefaultListFilesCache::new(10000, Some(ttl))
815 .with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
816
817 let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
818 let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50);
819
820 let table_ref = Some(TableReference::from("table"));
821 let key1 = TableScopedPath {
822 table: table_ref.clone(),
823 path: path1,
824 };
825 let key2 = TableScopedPath {
826 table: table_ref,
827 path: path2,
828 };
829 cache.put(&key1, value1.clone());
830 cache.put(&key2, value2.clone());
831
832 assert!(cache.get(&key1).is_some());
834 assert!(cache.get(&key2).is_some());
835 assert_eq!(
837 cache.list_entries(),
838 HashMap::from([
839 (
840 key1.clone(),
841 ListFilesEntry {
842 metas: value1,
843 size_bytes: size1,
844 expires: mock_time.now().checked_add(ttl),
845 }
846 ),
847 (
848 key2.clone(),
849 ListFilesEntry {
850 metas: value2,
851 size_bytes: size2,
852 expires: mock_time.now().checked_add(ttl),
853 }
854 )
855 ])
856 );
857 mock_time.inc(Duration::from_millis(150));
859
860 assert!(!cache.contains_key(&key1));
862 assert_eq!(cache.len(), 1); assert!(!cache.contains_key(&key2));
864 assert_eq!(cache.len(), 0); }
866
867 #[test]
868 fn test_cache_with_ttl_and_lru() {
869 let ttl = Duration::from_millis(200);
870
871 let mock_time = Arc::new(MockTimeProvider::new());
872 let cache = DefaultListFilesCache::new(1000, Some(ttl))
873 .with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
874
875 let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400);
876 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400);
877 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400);
878
879 let table_ref = Some(TableReference::from("table"));
880 let key1 = TableScopedPath {
881 table: table_ref.clone(),
882 path: path1,
883 };
884 let key2 = TableScopedPath {
885 table: table_ref.clone(),
886 path: path2,
887 };
888 let key3 = TableScopedPath {
889 table: table_ref,
890 path: path3,
891 };
892 cache.put(&key1, value1);
893 mock_time.inc(Duration::from_millis(50));
894 cache.put(&key2, value2);
895 mock_time.inc(Duration::from_millis(50));
896
897 cache.put(&key3, value3);
899 assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
901 assert!(cache.contains_key(&key3));
902
903 mock_time.inc(Duration::from_millis(151));
904
905 assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3)); }
908
909 #[test]
910 fn test_ttl_expiration_in_get() {
911 let ttl = Duration::from_millis(100);
912 let cache = DefaultListFilesCache::new(10000, Some(ttl));
913
914 let (path, value, _) = create_test_list_files_entry("path", 2, 50);
915 let table_ref = Some(TableReference::from("table"));
916 let key = TableScopedPath {
917 table: table_ref,
918 path,
919 };
920
921 cache.put(&key, value.clone());
923
924 let result = cache.get(&key);
926 assert!(result.is_some());
927 assert_eq!(result.unwrap().files.len(), 2);
928
929 thread::sleep(Duration::from_millis(150));
931
932 let result2 = cache.get(&key);
934 assert!(result2.is_none());
935 }
936
937 #[test]
938 fn test_meta_heap_bytes_calculation() {
939 let meta1 = ObjectMeta {
941 location: Path::from("test"),
942 last_modified: chrono::Utc::now(),
943 size: 100,
944 e_tag: None,
945 version: None,
946 };
947 assert_eq!(meta_heap_bytes(&meta1), 4); let meta2 = ObjectMeta {
951 location: Path::from("test"),
952 last_modified: chrono::Utc::now(),
953 size: 100,
954 e_tag: Some("etag123".to_string()),
955 version: None,
956 };
957 assert_eq!(meta_heap_bytes(&meta2), 4 + 7); let meta3 = ObjectMeta {
961 location: Path::from("test"),
962 last_modified: chrono::Utc::now(),
963 size: 100,
964 e_tag: None,
965 version: Some("v1.0".to_string()),
966 };
967 assert_eq!(meta_heap_bytes(&meta3), 4 + 4); let meta4 = ObjectMeta {
971 location: Path::from("test"),
972 last_modified: chrono::Utc::now(),
973 size: 100,
974 e_tag: Some("tag".to_string()),
975 version: Some("ver".to_string()),
976 };
977 assert_eq!(meta_heap_bytes(&meta4), 4 + 3 + 3); }
979
980 #[test]
981 fn test_entry_creation() {
982 let empty_list = CachedFileList::new(vec![]);
984 let now = Instant::now();
985 let entry = ListFilesEntry::try_new(empty_list, None, now);
986 assert!(entry.is_none());
987
988 let metas: Vec<ObjectMeta> = (0..5)
990 .map(|i| create_test_object_meta(&format!("file{i}"), 30))
991 .collect();
992 let cached_list = CachedFileList::new(metas);
993 let entry = ListFilesEntry::try_new(cached_list, None, now).unwrap();
994 assert_eq!(entry.metas.files.len(), 5);
995 let expected_size = (entry.metas.files.capacity() * size_of::<ObjectMeta>())
997 + (entry.metas.files.len() * 30);
998 assert_eq!(entry.size_bytes, expected_size);
999
1000 let meta = create_test_object_meta("file", 50);
1002 let ttl = Duration::from_secs(10);
1003 let cached_list = CachedFileList::new(vec![meta]);
1004 let entry = ListFilesEntry::try_new(cached_list, Some(ttl), now).unwrap();
1005 assert!(entry.expires.unwrap() > now);
1006 }
1007
1008 #[test]
1009 fn test_memory_tracking() {
1010 let cache = DefaultListFilesCache::new(1000, None);
1011
1012 {
1014 let state = cache.state.lock().unwrap();
1015 assert_eq!(state.memory_used, 0);
1016 }
1017
1018 let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 100);
1020 let table_ref = Some(TableReference::from("table"));
1021 let key1 = TableScopedPath {
1022 table: table_ref.clone(),
1023 path: path1,
1024 };
1025 cache.put(&key1, value1);
1026 {
1027 let state = cache.state.lock().unwrap();
1028 assert_eq!(state.memory_used, size1);
1029 }
1030
1031 let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 200);
1033 let key2 = TableScopedPath {
1034 table: table_ref.clone(),
1035 path: path2,
1036 };
1037 cache.put(&key2, value2);
1038 {
1039 let state = cache.state.lock().unwrap();
1040 assert_eq!(state.memory_used, size1 + size2);
1041 }
1042
1043 cache.remove(&key1);
1045 {
1046 let state = cache.state.lock().unwrap();
1047 assert_eq!(state.memory_used, size2);
1048 }
1049
1050 cache.clear();
1052 {
1053 let state = cache.state.lock().unwrap();
1054 assert_eq!(state.memory_used, 0);
1055 }
1056 }
1057
1058 fn create_object_meta_with_path(location: &str) -> ObjectMeta {
1062 ObjectMeta {
1063 location: Path::from(location),
1064 last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
1065 .unwrap()
1066 .into(),
1067 size: 1024,
1068 e_tag: None,
1069 version: None,
1070 }
1071 }
1072
1073 #[test]
1074 fn test_prefix_filtering() {
1075 let cache = DefaultListFilesCache::new(100000, None);
1076
1077 let table_base = Path::from("my_table");
1079 let files = vec![
1080 create_object_meta_with_path("my_table/a=1/file1.parquet"),
1081 create_object_meta_with_path("my_table/a=1/file2.parquet"),
1082 create_object_meta_with_path("my_table/a=2/file3.parquet"),
1083 create_object_meta_with_path("my_table/a=2/file4.parquet"),
1084 ];
1085
1086 let table_ref = Some(TableReference::from("table"));
1088 let key = TableScopedPath {
1089 table: table_ref,
1090 path: table_base,
1091 };
1092 cache.put(&key, CachedFileList::new(files));
1093
1094 let result = cache.get(&key).unwrap();
1095
1096 let prefix_a1 = Some(Path::from("my_table/a=1"));
1098 let filtered = result.files_matching_prefix(&prefix_a1);
1099 assert_eq!(filtered.len(), 2);
1100 assert!(
1101 filtered
1102 .iter()
1103 .all(|m| m.location.as_ref().starts_with("my_table/a=1"))
1104 );
1105
1106 let prefix_a2 = Some(Path::from("my_table/a=2"));
1108 let filtered_2 = result.files_matching_prefix(&prefix_a2);
1109 assert_eq!(filtered_2.len(), 2);
1110 assert!(
1111 filtered_2
1112 .iter()
1113 .all(|m| m.location.as_ref().starts_with("my_table/a=2"))
1114 );
1115
1116 let all = result.files_matching_prefix(&None);
1118 assert_eq!(all.len(), 4);
1119 }
1120
1121 #[test]
1122 fn test_prefix_no_matching_files() {
1123 let cache = DefaultListFilesCache::new(100000, None);
1124
1125 let table_base = Path::from("my_table");
1126 let files = vec![
1127 create_object_meta_with_path("my_table/a=1/file1.parquet"),
1128 create_object_meta_with_path("my_table/a=2/file2.parquet"),
1129 ];
1130
1131 let table_ref = Some(TableReference::from("table"));
1132 let key = TableScopedPath {
1133 table: table_ref,
1134 path: table_base,
1135 };
1136 cache.put(&key, CachedFileList::new(files));
1137 let result = cache.get(&key).unwrap();
1138
1139 let prefix_a3 = Some(Path::from("my_table/a=3"));
1141 let filtered = result.files_matching_prefix(&prefix_a3);
1142 assert!(filtered.is_empty());
1143 }
1144
1145 #[test]
1146 fn test_nested_partitions() {
1147 let cache = DefaultListFilesCache::new(100000, None);
1148
1149 let table_base = Path::from("events");
1150 let files = vec![
1151 create_object_meta_with_path(
1152 "events/year=2024/month=01/day=01/file1.parquet",
1153 ),
1154 create_object_meta_with_path(
1155 "events/year=2024/month=01/day=02/file2.parquet",
1156 ),
1157 create_object_meta_with_path(
1158 "events/year=2024/month=02/day=01/file3.parquet",
1159 ),
1160 create_object_meta_with_path(
1161 "events/year=2025/month=01/day=01/file4.parquet",
1162 ),
1163 ];
1164
1165 let table_ref = Some(TableReference::from("table"));
1166 let key = TableScopedPath {
1167 table: table_ref,
1168 path: table_base,
1169 };
1170 cache.put(&key, CachedFileList::new(files));
1171 let result = cache.get(&key).unwrap();
1172
1173 let prefix_month = Some(Path::from("events/year=2024/month=01"));
1175 let filtered = result.files_matching_prefix(&prefix_month);
1176 assert_eq!(filtered.len(), 2);
1177
1178 let prefix_year = Some(Path::from("events/year=2024"));
1180 let filtered_year = result.files_matching_prefix(&prefix_year);
1181 assert_eq!(filtered_year.len(), 3);
1182 }
1183
1184 #[test]
1185 fn test_drop_table_entries() {
1186 let cache = DefaultListFilesCache::default();
1187
1188 let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100);
1189 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
1190 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
1191
1192 let table_ref1 = Some(TableReference::from("table1"));
1193 let key1 = TableScopedPath {
1194 table: table_ref1.clone(),
1195 path: path1,
1196 };
1197 let key2 = TableScopedPath {
1198 table: table_ref1.clone(),
1199 path: path2,
1200 };
1201
1202 let table_ref2 = Some(TableReference::from("table2"));
1203 let key3 = TableScopedPath {
1204 table: table_ref2.clone(),
1205 path: path3,
1206 };
1207
1208 cache.put(&key1, value1);
1209 cache.put(&key2, value2);
1210 cache.put(&key3, value3);
1211
1212 cache.drop_table_entries(&table_ref1).unwrap();
1213
1214 assert!(!cache.contains_key(&key1));
1215 assert!(!cache.contains_key(&key2));
1216 assert!(cache.contains_key(&key3));
1217 }
1218}