1use crate::cache::{
19 CacheAccessor,
20 cache_manager::{CachedFileList, ListFilesCache},
21 lru_queue::LruQueue,
22};
23
24use std::fmt::{Debug, Display, Formatter};
25use std::mem::size_of;
26use std::{
27 collections::HashMap,
28 sync::{Arc, Mutex},
29 time::Duration,
30};
31
32use datafusion_common::TableReference;
33use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx};
34use datafusion_common::instant::Instant;
35use object_store::{ObjectMeta, path::Path};
36
37pub trait TimeProvider: Send + Sync + 'static {
38 fn now(&self) -> Instant;
39}
40
41#[derive(Debug, Default)]
42pub struct SystemTimeProvider;
43
44impl TimeProvider for SystemTimeProvider {
45 fn now(&self) -> Instant {
46 Instant::now()
47 }
48}
49
50pub struct DefaultListFilesCache {
66 state: Mutex<DefaultListFilesCacheState>,
67 time_provider: Arc<dyn TimeProvider>,
68}
69
70impl Default for DefaultListFilesCache {
71 fn default() -> Self {
72 Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None)
73 }
74}
75
76impl DefaultListFilesCache {
77 pub fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
83 Self {
84 state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)),
85 time_provider: Arc::new(SystemTimeProvider),
86 }
87 }
88
89 #[cfg(test)]
90 pub(crate) fn with_time_provider(mut self, provider: Arc<dyn TimeProvider>) -> Self {
91 self.time_provider = provider;
92 self
93 }
94}
95
96#[derive(Clone, PartialEq, Debug)]
97pub struct ListFilesEntry {
98 pub metas: CachedFileList,
99 pub size_bytes: usize,
100 pub expires: Option<Instant>,
101}
102
103impl ListFilesEntry {
104 fn try_new(
105 cached_file_list: CachedFileList,
106 ttl: Option<Duration>,
107 now: Instant,
108 ) -> Option<Self> {
109 let size_bytes = (cached_file_list.files.capacity() * size_of::<ObjectMeta>())
110 + cached_file_list
111 .files
112 .iter()
113 .map(meta_heap_bytes)
114 .reduce(|acc, b| acc + b)?;
115
116 Some(Self {
117 metas: cached_file_list,
118 size_bytes,
119 expires: ttl.map(|t| now + t),
120 })
121 }
122}
123
124fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {
126 let mut size = object_meta.location.as_ref().len();
127
128 if let Some(e) = &object_meta.e_tag {
129 size += e.len();
130 }
131 if let Some(v) = &object_meta.version {
132 size += v.len();
133 }
134
135 size
136}
137
138pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; pub const DEFAULT_LIST_FILES_CACHE_TTL: Option<Duration> = None; #[derive(PartialEq, Eq, Hash, Clone, Debug)]
150pub struct TableScopedPath {
151 pub table: Option<TableReference>,
152 pub path: Path,
153}
154
155pub struct DefaultListFilesCacheState {
157 lru_queue: LruQueue<TableScopedPath, ListFilesEntry>,
158 memory_limit: usize,
159 memory_used: usize,
160 ttl: Option<Duration>,
161}
162
163impl Default for DefaultListFilesCacheState {
164 fn default() -> Self {
165 Self {
166 lru_queue: LruQueue::new(),
167 memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
168 memory_used: 0,
169 ttl: DEFAULT_LIST_FILES_CACHE_TTL,
170 }
171 }
172}
173
174impl DFHeapSize for TableScopedPath {
175 fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
176 self.path.as_ref().heap_size(ctx) + self.table.heap_size(ctx)
177 }
178}
179
180impl Display for TableScopedPath {
181 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
182 if let Some(table) = &self.table {
183 write!(f, "{}, {}", self.path, table)
184 } else {
185 write!(f, "{}", self.path)
186 }
187 }
188}
189
190impl DefaultListFilesCacheState {
191 fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
192 Self {
193 lru_queue: LruQueue::new(),
194 memory_limit,
195 memory_used: 0,
196 ttl,
197 }
198 }
199
200 fn get(&mut self, key: &TableScopedPath, now: Instant) -> Option<CachedFileList> {
205 let entry = self.lru_queue.get(key)?;
206
207 if let Some(exp) = entry.expires
209 && now > exp
210 {
211 self.remove(key);
212 return None;
213 }
214
215 Some(entry.metas.clone())
216 }
217
218 fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool {
224 let Some(entry) = self.lru_queue.peek(k) else {
225 return false;
226 };
227
228 match entry.expires {
229 Some(exp) if now > exp => {
230 self.remove(k);
231 false
232 }
233 _ => true,
234 }
235 }
236
237 fn put(
243 &mut self,
244 key: &TableScopedPath,
245 value: CachedFileList,
246 now: Instant,
247 ) -> Option<CachedFileList> {
248 let entry = ListFilesEntry::try_new(value, self.ttl, now)?;
249 let entry_size = entry.size_bytes;
250
251 if entry_size > self.memory_limit {
253 return None;
254 }
255
256 let old_value = self.lru_queue.put(key.clone(), entry);
258 self.memory_used += entry_size;
259
260 if let Some(entry) = &old_value {
261 self.memory_used -= entry.size_bytes;
262 }
263
264 self.evict_entries();
265
266 old_value.map(|v| v.metas)
267 }
268
269 fn evict_entries(&mut self) {
271 while self.memory_used > self.memory_limit {
272 if let Some(removed) = self.lru_queue.pop() {
273 self.memory_used -= removed.1.size_bytes;
274 } else {
275 debug_assert!(
277 false,
278 "cache is empty while memory_used > memory_limit, cannot happen"
279 );
280 return;
281 }
282 }
283 }
284
285 fn remove(&mut self, k: &TableScopedPath) -> Option<CachedFileList> {
287 if let Some(entry) = self.lru_queue.remove(k) {
288 self.memory_used -= entry.size_bytes;
289 Some(entry.metas)
290 } else {
291 None
292 }
293 }
294
295 fn len(&self) -> usize {
297 self.lru_queue.len()
298 }
299
300 fn clear(&mut self) {
302 self.lru_queue.clear();
303 self.memory_used = 0;
304 }
305}
306
307impl CacheAccessor<TableScopedPath, CachedFileList> for DefaultListFilesCache {
308 fn get(&self, key: &TableScopedPath) -> Option<CachedFileList> {
309 let mut state = self.state.lock().unwrap();
310 let now = self.time_provider.now();
311 state.get(key, now)
312 }
313
314 fn put(
315 &self,
316 key: &TableScopedPath,
317 value: CachedFileList,
318 ) -> Option<CachedFileList> {
319 let mut state = self.state.lock().unwrap();
320 let now = self.time_provider.now();
321 state.put(key, value, now)
322 }
323
324 fn remove(&self, k: &TableScopedPath) -> Option<CachedFileList> {
325 let mut state = self.state.lock().unwrap();
326 state.remove(k)
327 }
328
329 fn contains_key(&self, k: &TableScopedPath) -> bool {
330 let mut state = self.state.lock().unwrap();
331 let now = self.time_provider.now();
332 state.contains_key(k, now)
333 }
334
335 fn len(&self) -> usize {
336 let state = self.state.lock().unwrap();
337 state.len()
338 }
339
340 fn clear(&self) {
341 let mut state = self.state.lock().unwrap();
342 state.clear();
343 }
344
345 fn name(&self) -> String {
346 String::from("DefaultListFilesCache")
347 }
348}
349
350impl ListFilesCache for DefaultListFilesCache {
351 fn cache_limit(&self) -> usize {
352 let state = self.state.lock().unwrap();
353 state.memory_limit
354 }
355
356 fn cache_ttl(&self) -> Option<Duration> {
357 let state = self.state.lock().unwrap();
358 state.ttl
359 }
360
361 fn update_cache_limit(&self, limit: usize) {
362 let mut state = self.state.lock().unwrap();
363 state.memory_limit = limit;
364 state.evict_entries();
365 }
366
367 fn update_cache_ttl(&self, ttl: Option<Duration>) {
368 let mut state = self.state.lock().unwrap();
369 state.ttl = ttl;
370 state.evict_entries();
371 }
372
373 fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry> {
374 let state = self.state.lock().unwrap();
375 let mut entries = HashMap::<TableScopedPath, ListFilesEntry>::new();
376 for (path, entry) in state.lru_queue.list_entries() {
377 entries.insert(path.clone(), entry.clone());
378 }
379 entries
380 }
381
382 fn drop_table_entries(
383 &self,
384 table_ref: &Option<TableReference>,
385 ) -> datafusion_common::Result<()> {
386 let mut state = self.state.lock().unwrap();
387 let mut table_paths = vec![];
388 for (path, _) in state.lru_queue.list_entries() {
389 if path.table == *table_ref {
390 table_paths.push(path.clone());
391 }
392 }
393 for path in table_paths {
394 state.remove(&path);
395 }
396 Ok(())
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403 use chrono::DateTime;
404 use std::thread;
405
406 struct MockTimeProvider {
407 base: Instant,
408 offset: Mutex<Duration>,
409 }
410
411 impl MockTimeProvider {
412 fn new() -> Self {
413 Self {
414 base: Instant::now(),
415 offset: Mutex::new(Duration::ZERO),
416 }
417 }
418
419 fn inc(&self, duration: Duration) {
420 let mut offset = self.offset.lock().unwrap();
421 *offset += duration;
422 }
423 }
424
425 impl TimeProvider for MockTimeProvider {
426 fn now(&self) -> Instant {
427 self.base + *self.offset.lock().unwrap()
428 }
429 }
430
431 fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta {
433 let location_str = if location_size > path.len() {
435 format!("{}{}", path, "0".repeat(location_size - path.len()))
436 } else {
437 path.to_string()
438 };
439
440 ObjectMeta {
441 location: Path::from(location_str),
442 last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
443 .unwrap()
444 .into(),
445 size: 1024,
446 e_tag: None,
447 version: None,
448 }
449 }
450
451 fn create_test_list_files_entry(
453 path: &str,
454 count: usize,
455 meta_size: usize,
456 ) -> (Path, CachedFileList, usize) {
457 let metas: Vec<ObjectMeta> = (0..count)
458 .map(|i| create_test_object_meta(&format!("file{i}"), meta_size))
459 .collect();
460
461 let size = (metas.capacity() * size_of::<ObjectMeta>())
463 + metas.iter().map(meta_heap_bytes).sum::<usize>();
464
465 (Path::from(path), CachedFileList::new(metas), size)
466 }
467
468 #[test]
469 fn test_basic_operations() {
470 let cache = DefaultListFilesCache::default();
471 let table_ref = Some(TableReference::from("table"));
472 let path = Path::from("test_path");
473 let key = TableScopedPath {
474 table: table_ref.clone(),
475 path,
476 };
477
478 assert!(!cache.contains_key(&key));
480 assert_eq!(cache.len(), 0);
481
482 assert!(cache.get(&key).is_none());
484
485 let meta = create_test_object_meta("file1", 50);
487 cache.put(&key, CachedFileList::new(vec![meta]));
488
489 assert!(cache.contains_key(&key));
491 assert_eq!(cache.len(), 1);
492 let result = cache.get(&key).unwrap();
493 assert_eq!(result.files.len(), 1);
494
495 let removed = cache.remove(&key).unwrap();
497 assert_eq!(removed.files.len(), 1);
498 assert!(!cache.contains_key(&key));
499 assert_eq!(cache.len(), 0);
500
501 let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
503 let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50);
504 let key1 = TableScopedPath {
505 table: table_ref.clone(),
506 path: path1,
507 };
508 let key2 = TableScopedPath {
509 table: table_ref,
510 path: path2,
511 };
512 cache.put(&key1, value1.clone());
513 cache.put(&key2, value2.clone());
514 assert_eq!(cache.len(), 2);
515
516 assert_eq!(
518 cache.list_entries(),
519 HashMap::from([
520 (
521 key1.clone(),
522 ListFilesEntry {
523 metas: value1,
524 size_bytes: size1,
525 expires: None,
526 }
527 ),
528 (
529 key2.clone(),
530 ListFilesEntry {
531 metas: value2,
532 size_bytes: size2,
533 expires: None,
534 }
535 )
536 ])
537 );
538
539 cache.clear();
541 assert_eq!(cache.len(), 0);
542 assert!(!cache.contains_key(&key1));
543 assert!(!cache.contains_key(&key2));
544 }
545
546 #[test]
547 fn test_lru_eviction_basic() {
548 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
549 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
550 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
551
552 let cache = DefaultListFilesCache::new(size * 3, None);
554
555 let table_ref = Some(TableReference::from("table"));
556 let key1 = TableScopedPath {
557 table: table_ref.clone(),
558 path: path1,
559 };
560 let key2 = TableScopedPath {
561 table: table_ref.clone(),
562 path: path2,
563 };
564 let key3 = TableScopedPath {
565 table: table_ref.clone(),
566 path: path3,
567 };
568
569 cache.put(&key1, value1);
571 cache.put(&key2, value2);
572 cache.put(&key3, value3);
573 assert_eq!(cache.len(), 3);
574 assert!(cache.contains_key(&key1));
575 assert!(cache.contains_key(&key2));
576 assert!(cache.contains_key(&key3));
577
578 let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
580 let key4 = TableScopedPath {
581 table: table_ref,
582 path: path4,
583 };
584 cache.put(&key4, value4);
585
586 assert_eq!(cache.len(), 3);
587 assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
589 assert!(cache.contains_key(&key3));
590 assert!(cache.contains_key(&key4));
591 }
592
593 #[test]
594 fn test_lru_ordering_after_access() {
595 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
596 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
597 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
598
599 let cache = DefaultListFilesCache::new(size * 3, None);
601
602 let table_ref = Some(TableReference::from("table"));
603 let key1 = TableScopedPath {
604 table: table_ref.clone(),
605 path: path1,
606 };
607 let key2 = TableScopedPath {
608 table: table_ref.clone(),
609 path: path2,
610 };
611 let key3 = TableScopedPath {
612 table: table_ref.clone(),
613 path: path3,
614 };
615
616 cache.put(&key1, value1);
617 cache.put(&key2, value2);
618 cache.put(&key3, value3);
619 assert_eq!(cache.len(), 3);
620
621 let _ = cache.get(&key1);
624
625 let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
627 let key4 = TableScopedPath {
628 table: table_ref,
629 path: path4,
630 };
631 cache.put(&key4, value4);
632
633 assert_eq!(cache.len(), 3);
634 assert!(cache.contains_key(&key1)); assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3));
637 assert!(cache.contains_key(&key4));
638 }
639
640 #[test]
641 fn test_reject_too_large() {
642 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
643 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
644
645 let cache = DefaultListFilesCache::new(size * 2, None);
647
648 let table_ref = Some(TableReference::from("table"));
649 let key1 = TableScopedPath {
650 table: table_ref.clone(),
651 path: path1,
652 };
653 let key2 = TableScopedPath {
654 table: table_ref.clone(),
655 path: path2,
656 };
657 cache.put(&key1, value1);
658 cache.put(&key2, value2);
659 assert_eq!(cache.len(), 2);
660
661 let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000);
664 let key_large = TableScopedPath {
665 table: table_ref,
666 path: path_large,
667 };
668 cache.put(&key_large, value_large);
669
670 assert!(!cache.contains_key(&key_large));
672 assert_eq!(cache.len(), 2);
673 assert!(cache.contains_key(&key1));
674 assert!(cache.contains_key(&key2));
675 }
676
677 #[test]
678 fn test_multiple_evictions() {
679 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
680 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
681 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
682
683 let cache = DefaultListFilesCache::new(size * 3, None);
685
686 let table_ref = Some(TableReference::from("table"));
687 let key1 = TableScopedPath {
688 table: table_ref.clone(),
689 path: path1,
690 };
691 let key2 = TableScopedPath {
692 table: table_ref.clone(),
693 path: path2,
694 };
695 let key3 = TableScopedPath {
696 table: table_ref.clone(),
697 path: path3,
698 };
699 cache.put(&key1, value1);
700 cache.put(&key2, value2);
701 cache.put(&key3, value3);
702 assert_eq!(cache.len(), 3);
703
704 let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 200);
706 let key_large = TableScopedPath {
707 table: table_ref,
708 path: path_large,
709 };
710 cache.put(&key_large, value_large);
711
712 assert_eq!(cache.len(), 2);
714 assert!(!cache.contains_key(&key1)); assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3));
717 assert!(cache.contains_key(&key_large));
718 }
719
720 #[test]
721 fn test_cache_limit_resize() {
722 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
723 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
724 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
725
726 let cache = DefaultListFilesCache::new(size * 3, None);
727
728 let table_ref = Some(TableReference::from("table"));
729 let key1 = TableScopedPath {
730 table: table_ref.clone(),
731 path: path1,
732 };
733 let key2 = TableScopedPath {
734 table: table_ref.clone(),
735 path: path2,
736 };
737 let key3 = TableScopedPath {
738 table: table_ref,
739 path: path3,
740 };
741 cache.put(&key1, value1);
743 cache.put(&key2, value2);
744 cache.put(&key3, value3);
745 assert_eq!(cache.len(), 3);
746
747 cache.update_cache_limit(size);
749
750 assert_eq!(cache.len(), 1);
752 assert!(cache.contains_key(&key3));
753 assert!(!cache.contains_key(&key1));
755 assert!(!cache.contains_key(&key2));
756 }
757
758 #[test]
759 fn test_entry_update_with_size_change() {
760 let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
761 let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 100);
762 let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 100);
763
764 let cache = DefaultListFilesCache::new(size * 3, None);
765
766 let table_ref = Some(TableReference::from("table"));
767 let key1 = TableScopedPath {
768 table: table_ref.clone(),
769 path: path1,
770 };
771 let key2 = TableScopedPath {
772 table: table_ref.clone(),
773 path: path2,
774 };
775 let key3 = TableScopedPath {
776 table: table_ref,
777 path: path3,
778 };
779 cache.put(&key1, value1);
781 cache.put(&key2, value2.clone());
782 cache.put(&key3, value3_v1);
783 assert_eq!(cache.len(), 3);
784
785 let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100);
787 cache.put(&key3, value3_v2);
788
789 assert_eq!(cache.len(), 3);
790 assert!(cache.contains_key(&key1));
791 assert!(cache.contains_key(&key2));
792 assert!(cache.contains_key(&key3));
793
794 let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200);
796 cache.put(&key3, value3_v3.clone());
797
798 assert_eq!(cache.len(), 2);
799 assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
801 assert!(cache.contains_key(&key3));
802
803 assert_eq!(
805 cache.list_entries(),
806 HashMap::from([
807 (
808 key2,
809 ListFilesEntry {
810 metas: value2,
811 size_bytes: size2,
812 expires: None,
813 }
814 ),
815 (
816 key3,
817 ListFilesEntry {
818 metas: value3_v3,
819 size_bytes: size3_v3,
820 expires: None,
821 }
822 )
823 ])
824 );
825 }
826
827 #[test]
828 fn test_cache_with_ttl() {
829 let ttl = Duration::from_millis(100);
830
831 let mock_time = Arc::new(MockTimeProvider::new());
832 let cache = DefaultListFilesCache::new(10000, Some(ttl))
833 .with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
834
835 let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
836 let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50);
837
838 let table_ref = Some(TableReference::from("table"));
839 let key1 = TableScopedPath {
840 table: table_ref.clone(),
841 path: path1,
842 };
843 let key2 = TableScopedPath {
844 table: table_ref,
845 path: path2,
846 };
847 cache.put(&key1, value1.clone());
848 cache.put(&key2, value2.clone());
849
850 assert!(cache.get(&key1).is_some());
852 assert!(cache.get(&key2).is_some());
853 assert_eq!(
855 cache.list_entries(),
856 HashMap::from([
857 (
858 key1.clone(),
859 ListFilesEntry {
860 metas: value1,
861 size_bytes: size1,
862 expires: mock_time.now().checked_add(ttl),
863 }
864 ),
865 (
866 key2.clone(),
867 ListFilesEntry {
868 metas: value2,
869 size_bytes: size2,
870 expires: mock_time.now().checked_add(ttl),
871 }
872 )
873 ])
874 );
875 mock_time.inc(Duration::from_millis(150));
877
878 assert!(!cache.contains_key(&key1));
880 assert_eq!(cache.len(), 1); assert!(!cache.contains_key(&key2));
882 assert_eq!(cache.len(), 0); }
884
885 #[test]
886 fn test_cache_with_ttl_and_lru() {
887 let ttl = Duration::from_millis(200);
888
889 let mock_time = Arc::new(MockTimeProvider::new());
890 let cache = DefaultListFilesCache::new(1000, Some(ttl))
891 .with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
892
893 let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400);
894 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400);
895 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400);
896
897 let table_ref = Some(TableReference::from("table"));
898 let key1 = TableScopedPath {
899 table: table_ref.clone(),
900 path: path1,
901 };
902 let key2 = TableScopedPath {
903 table: table_ref.clone(),
904 path: path2,
905 };
906 let key3 = TableScopedPath {
907 table: table_ref,
908 path: path3,
909 };
910 cache.put(&key1, value1);
911 mock_time.inc(Duration::from_millis(50));
912 cache.put(&key2, value2);
913 mock_time.inc(Duration::from_millis(50));
914
915 cache.put(&key3, value3);
917 assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
919 assert!(cache.contains_key(&key3));
920
921 mock_time.inc(Duration::from_millis(151));
922
923 assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3)); }
926
927 #[test]
928 fn test_ttl_expiration_in_get() {
929 let ttl = Duration::from_millis(100);
930 let cache = DefaultListFilesCache::new(10000, Some(ttl));
931
932 let (path, value, _) = create_test_list_files_entry("path", 2, 50);
933 let table_ref = Some(TableReference::from("table"));
934 let key = TableScopedPath {
935 table: table_ref,
936 path,
937 };
938
939 cache.put(&key, value.clone());
941
942 let result = cache.get(&key);
944 assert!(result.is_some());
945 assert_eq!(result.unwrap().files.len(), 2);
946
947 thread::sleep(Duration::from_millis(150));
949
950 let result2 = cache.get(&key);
952 assert!(result2.is_none());
953 }
954
955 #[test]
956 fn test_meta_heap_bytes_calculation() {
957 let meta1 = ObjectMeta {
959 location: Path::from("test"),
960 last_modified: chrono::Utc::now(),
961 size: 100,
962 e_tag: None,
963 version: None,
964 };
965 assert_eq!(meta_heap_bytes(&meta1), 4); let meta2 = ObjectMeta {
969 location: Path::from("test"),
970 last_modified: chrono::Utc::now(),
971 size: 100,
972 e_tag: Some("etag123".to_string()),
973 version: None,
974 };
975 assert_eq!(meta_heap_bytes(&meta2), 4 + 7); let meta3 = ObjectMeta {
979 location: Path::from("test"),
980 last_modified: chrono::Utc::now(),
981 size: 100,
982 e_tag: None,
983 version: Some("v1.0".to_string()),
984 };
985 assert_eq!(meta_heap_bytes(&meta3), 4 + 4); let meta4 = ObjectMeta {
989 location: Path::from("test"),
990 last_modified: chrono::Utc::now(),
991 size: 100,
992 e_tag: Some("tag".to_string()),
993 version: Some("ver".to_string()),
994 };
995 assert_eq!(meta_heap_bytes(&meta4), 4 + 3 + 3); }
997
998 #[test]
999 fn test_entry_creation() {
1000 let empty_list = CachedFileList::new(vec![]);
1002 let now = Instant::now();
1003 let entry = ListFilesEntry::try_new(empty_list, None, now);
1004 assert!(entry.is_none());
1005
1006 let metas: Vec<ObjectMeta> = (0..5)
1008 .map(|i| create_test_object_meta(&format!("file{i}"), 30))
1009 .collect();
1010 let cached_list = CachedFileList::new(metas);
1011 let entry = ListFilesEntry::try_new(cached_list, None, now).unwrap();
1012 assert_eq!(entry.metas.files.len(), 5);
1013 let expected_size = (entry.metas.files.capacity() * size_of::<ObjectMeta>())
1015 + (entry.metas.files.len() * 30);
1016 assert_eq!(entry.size_bytes, expected_size);
1017
1018 let meta = create_test_object_meta("file", 50);
1020 let ttl = Duration::from_secs(10);
1021 let cached_list = CachedFileList::new(vec![meta]);
1022 let entry = ListFilesEntry::try_new(cached_list, Some(ttl), now).unwrap();
1023 assert!(entry.expires.unwrap() > now);
1024 }
1025
1026 #[test]
1027 fn test_memory_tracking() {
1028 let cache = DefaultListFilesCache::new(1000, None);
1029
1030 {
1032 let state = cache.state.lock().unwrap();
1033 assert_eq!(state.memory_used, 0);
1034 }
1035
1036 let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 100);
1038 let table_ref = Some(TableReference::from("table"));
1039 let key1 = TableScopedPath {
1040 table: table_ref.clone(),
1041 path: path1,
1042 };
1043 cache.put(&key1, value1);
1044 {
1045 let state = cache.state.lock().unwrap();
1046 assert_eq!(state.memory_used, size1);
1047 }
1048
1049 let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 200);
1051 let key2 = TableScopedPath {
1052 table: table_ref.clone(),
1053 path: path2,
1054 };
1055 cache.put(&key2, value2);
1056 {
1057 let state = cache.state.lock().unwrap();
1058 assert_eq!(state.memory_used, size1 + size2);
1059 }
1060
1061 cache.remove(&key1);
1063 {
1064 let state = cache.state.lock().unwrap();
1065 assert_eq!(state.memory_used, size2);
1066 }
1067
1068 cache.clear();
1070 {
1071 let state = cache.state.lock().unwrap();
1072 assert_eq!(state.memory_used, 0);
1073 }
1074 }
1075
1076 fn create_object_meta_with_path(location: &str) -> ObjectMeta {
1080 ObjectMeta {
1081 location: Path::from(location),
1082 last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
1083 .unwrap()
1084 .into(),
1085 size: 1024,
1086 e_tag: None,
1087 version: None,
1088 }
1089 }
1090
1091 #[test]
1092 fn test_prefix_filtering() {
1093 let cache = DefaultListFilesCache::new(100000, None);
1094
1095 let table_base = Path::from("my_table");
1097 let files = vec![
1098 create_object_meta_with_path("my_table/a=1/file1.parquet"),
1099 create_object_meta_with_path("my_table/a=1/file2.parquet"),
1100 create_object_meta_with_path("my_table/a=2/file3.parquet"),
1101 create_object_meta_with_path("my_table/a=2/file4.parquet"),
1102 ];
1103
1104 let table_ref = Some(TableReference::from("table"));
1106 let key = TableScopedPath {
1107 table: table_ref,
1108 path: table_base,
1109 };
1110 cache.put(&key, CachedFileList::new(files));
1111
1112 let result = cache.get(&key).unwrap();
1113
1114 let prefix_a1 = Some(Path::from("my_table/a=1"));
1116 let filtered = result.files_matching_prefix(&prefix_a1);
1117 assert_eq!(filtered.len(), 2);
1118 assert!(
1119 filtered
1120 .iter()
1121 .all(|m| m.location.as_ref().starts_with("my_table/a=1"))
1122 );
1123
1124 let prefix_a2 = Some(Path::from("my_table/a=2"));
1126 let filtered_2 = result.files_matching_prefix(&prefix_a2);
1127 assert_eq!(filtered_2.len(), 2);
1128 assert!(
1129 filtered_2
1130 .iter()
1131 .all(|m| m.location.as_ref().starts_with("my_table/a=2"))
1132 );
1133
1134 let all = result.files_matching_prefix(&None);
1136 assert_eq!(all.len(), 4);
1137 }
1138
1139 #[test]
1140 fn test_prefix_no_matching_files() {
1141 let cache = DefaultListFilesCache::new(100000, None);
1142
1143 let table_base = Path::from("my_table");
1144 let files = vec![
1145 create_object_meta_with_path("my_table/a=1/file1.parquet"),
1146 create_object_meta_with_path("my_table/a=2/file2.parquet"),
1147 ];
1148
1149 let table_ref = Some(TableReference::from("table"));
1150 let key = TableScopedPath {
1151 table: table_ref,
1152 path: table_base,
1153 };
1154 cache.put(&key, CachedFileList::new(files));
1155 let result = cache.get(&key).unwrap();
1156
1157 let prefix_a3 = Some(Path::from("my_table/a=3"));
1159 let filtered = result.files_matching_prefix(&prefix_a3);
1160 assert!(filtered.is_empty());
1161 }
1162
1163 #[test]
1164 fn test_nested_partitions() {
1165 let cache = DefaultListFilesCache::new(100000, None);
1166
1167 let table_base = Path::from("events");
1168 let files = vec![
1169 create_object_meta_with_path(
1170 "events/year=2024/month=01/day=01/file1.parquet",
1171 ),
1172 create_object_meta_with_path(
1173 "events/year=2024/month=01/day=02/file2.parquet",
1174 ),
1175 create_object_meta_with_path(
1176 "events/year=2024/month=02/day=01/file3.parquet",
1177 ),
1178 create_object_meta_with_path(
1179 "events/year=2025/month=01/day=01/file4.parquet",
1180 ),
1181 ];
1182
1183 let table_ref = Some(TableReference::from("table"));
1184 let key = TableScopedPath {
1185 table: table_ref,
1186 path: table_base,
1187 };
1188 cache.put(&key, CachedFileList::new(files));
1189 let result = cache.get(&key).unwrap();
1190
1191 let prefix_month = Some(Path::from("events/year=2024/month=01"));
1193 let filtered = result.files_matching_prefix(&prefix_month);
1194 assert_eq!(filtered.len(), 2);
1195
1196 let prefix_year = Some(Path::from("events/year=2024"));
1198 let filtered_year = result.files_matching_prefix(&prefix_year);
1199 assert_eq!(filtered_year.len(), 3);
1200 }
1201
1202 #[test]
1203 fn test_drop_table_entries() {
1204 let cache = DefaultListFilesCache::default();
1205
1206 let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100);
1207 let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
1208 let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
1209
1210 let table_ref1 = Some(TableReference::from("table1"));
1211 let key1 = TableScopedPath {
1212 table: table_ref1.clone(),
1213 path: path1,
1214 };
1215 let key2 = TableScopedPath {
1216 table: table_ref1.clone(),
1217 path: path2,
1218 };
1219
1220 let table_ref2 = Some(TableReference::from("table2"));
1221 let key3 = TableScopedPath {
1222 table: table_ref2.clone(),
1223 path: path3,
1224 };
1225
1226 cache.put(&key1, value1);
1227 cache.put(&key2, value2);
1228 cache.put(&key3, value3);
1229
1230 cache.drop_table_entries(&table_ref1).unwrap();
1231
1232 assert!(!cache.contains_key(&key1));
1233 assert!(!cache.contains_key(&key2));
1234 assert!(cache.contains_key(&key3));
1235 }
1236}