Skip to main content

datafusion_execution/cache/
list_files_cache.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::mem::size_of;
19use std::{
20    collections::HashMap,
21    sync::{Arc, Mutex},
22    time::Duration,
23};
24
25use datafusion_common::TableReference;
26use datafusion_common::instant::Instant;
27use object_store::{ObjectMeta, path::Path};
28
29use crate::cache::{
30    CacheAccessor,
31    cache_manager::{CachedFileList, ListFilesCache},
32    lru_queue::LruQueue,
33};
34
35pub trait TimeProvider: Send + Sync + 'static {
36    fn now(&self) -> Instant;
37}
38
39#[derive(Debug, Default)]
40pub struct SystemTimeProvider;
41
42impl TimeProvider for SystemTimeProvider {
43    fn now(&self) -> Instant {
44        Instant::now()
45    }
46}
47
48/// Default implementation of [`ListFilesCache`]
49///
50/// Caches file metadata for file listing operations.
51///
52/// # Internal details
53///
54/// The `memory_limit` parameter controls the maximum size of the cache, which uses a Least
55/// Recently Used eviction algorithm. When adding a new entry, if the total number of entries in
56/// the cache exceeds `memory_limit`, the least recently used entries are evicted until the total
57/// size is lower than the `memory_limit`.
58///
59/// # Cache API
60///
61/// Uses `get` and `put` methods for cache operations. TTL validation is handled internally -
62/// expired entries return `None` from `get`.
63pub struct DefaultListFilesCache {
64    state: Mutex<DefaultListFilesCacheState>,
65    time_provider: Arc<dyn TimeProvider>,
66}
67
68impl Default for DefaultListFilesCache {
69    fn default() -> Self {
70        Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None)
71    }
72}
73
74impl DefaultListFilesCache {
75    /// Creates a new instance of [`DefaultListFilesCache`].
76    ///
77    /// # Arguments
78    /// * `memory_limit` - The maximum size of the cache, in bytes.
79    /// * `ttl` - The TTL (time-to-live) of entries in the cache.
80    pub fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
81        Self {
82            state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)),
83            time_provider: Arc::new(SystemTimeProvider),
84        }
85    }
86
87    #[cfg(test)]
88    pub(crate) fn with_time_provider(mut self, provider: Arc<dyn TimeProvider>) -> Self {
89        self.time_provider = provider;
90        self
91    }
92}
93
94#[derive(Clone, PartialEq, Debug)]
95pub struct ListFilesEntry {
96    pub metas: CachedFileList,
97    pub size_bytes: usize,
98    pub expires: Option<Instant>,
99}
100
101impl ListFilesEntry {
102    fn try_new(
103        cached_file_list: CachedFileList,
104        ttl: Option<Duration>,
105        now: Instant,
106    ) -> Option<Self> {
107        let size_bytes = (cached_file_list.files.capacity() * size_of::<ObjectMeta>())
108            + cached_file_list
109                .files
110                .iter()
111                .map(meta_heap_bytes)
112                .reduce(|acc, b| acc + b)?;
113
114        Some(Self {
115            metas: cached_file_list,
116            size_bytes,
117            expires: ttl.map(|t| now + t),
118        })
119    }
120}
121
122/// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap.
123fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {
124    let mut size = object_meta.location.as_ref().len();
125
126    if let Some(e) = &object_meta.e_tag {
127        size += e.len();
128    }
129    if let Some(v) = &object_meta.version {
130        size += v.len();
131    }
132
133    size
134}
135
136/// The default memory limit for the [`DefaultListFilesCache`]
137pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB
138
139/// The default cache TTL for the [`DefaultListFilesCache`]
140pub const DEFAULT_LIST_FILES_CACHE_TTL: Option<Duration> = None; // Infinite
141
142/// Key for [`DefaultListFilesCache`]
143///
144/// Each entry is scoped to its use within a specific table so that the cache
145/// can differentiate between identical paths in different tables, and
146/// table-level cache invalidation.
147#[derive(PartialEq, Eq, Hash, Clone, Debug)]
148pub struct TableScopedPath {
149    pub table: Option<TableReference>,
150    pub path: Path,
151}
152
153/// Handles the inner state of the [`DefaultListFilesCache`] struct.
154pub struct DefaultListFilesCacheState {
155    lru_queue: LruQueue<TableScopedPath, ListFilesEntry>,
156    memory_limit: usize,
157    memory_used: usize,
158    ttl: Option<Duration>,
159}
160
161impl Default for DefaultListFilesCacheState {
162    fn default() -> Self {
163        Self {
164            lru_queue: LruQueue::new(),
165            memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
166            memory_used: 0,
167            ttl: DEFAULT_LIST_FILES_CACHE_TTL,
168        }
169    }
170}
171
172impl DefaultListFilesCacheState {
173    fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
174        Self {
175            lru_queue: LruQueue::new(),
176            memory_limit,
177            memory_used: 0,
178            ttl,
179        }
180    }
181
182    /// Gets an entry from the cache, checking for expiration.
183    ///
184    /// Returns the cached file list if it exists and hasn't expired.
185    /// If the entry has expired, it is removed from the cache.
186    fn get(&mut self, key: &TableScopedPath, now: Instant) -> Option<CachedFileList> {
187        let entry = self.lru_queue.get(key)?;
188
189        // Check expiration
190        if let Some(exp) = entry.expires
191            && now > exp
192        {
193            self.remove(key);
194            return None;
195        }
196
197        Some(entry.metas.clone())
198    }
199
200    /// Checks if the respective entry is currently cached.
201    ///
202    /// If the entry has expired by `now` it is removed from the cache.
203    ///
204    /// The LRU queue is not updated.
205    fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool {
206        let Some(entry) = self.lru_queue.peek(k) else {
207            return false;
208        };
209
210        match entry.expires {
211            Some(exp) if now > exp => {
212                self.remove(k);
213                false
214            }
215            _ => true,
216        }
217    }
218
219    /// Adds a new key-value pair to cache expiring at `now` + the TTL.
220    ///
221    /// This means that LRU entries might be evicted if required.
222    /// If the key is already in the cache, the previous entry is returned.
223    /// If the size of the entry is greater than the `memory_limit`, the value is not inserted.
224    fn put(
225        &mut self,
226        key: &TableScopedPath,
227        value: CachedFileList,
228        now: Instant,
229    ) -> Option<CachedFileList> {
230        let entry = ListFilesEntry::try_new(value, self.ttl, now)?;
231        let entry_size = entry.size_bytes;
232
233        // no point in trying to add this value to the cache if it cannot fit entirely
234        if entry_size > self.memory_limit {
235            return None;
236        }
237
238        // if the key is already in the cache, the old value is removed
239        let old_value = self.lru_queue.put(key.clone(), entry);
240        self.memory_used += entry_size;
241
242        if let Some(entry) = &old_value {
243            self.memory_used -= entry.size_bytes;
244        }
245
246        self.evict_entries();
247
248        old_value.map(|v| v.metas)
249    }
250
251    /// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`.
252    fn evict_entries(&mut self) {
253        while self.memory_used > self.memory_limit {
254            if let Some(removed) = self.lru_queue.pop() {
255                self.memory_used -= removed.1.size_bytes;
256            } else {
257                // cache is empty while memory_used > memory_limit, cannot happen
258                debug_assert!(
259                    false,
260                    "cache is empty while memory_used > memory_limit, cannot happen"
261                );
262                return;
263            }
264        }
265    }
266
267    /// Removes an entry from the cache and returns it, if it exists.
268    fn remove(&mut self, k: &TableScopedPath) -> Option<CachedFileList> {
269        if let Some(entry) = self.lru_queue.remove(k) {
270            self.memory_used -= entry.size_bytes;
271            Some(entry.metas)
272        } else {
273            None
274        }
275    }
276
277    /// Returns the number of entries currently cached.
278    fn len(&self) -> usize {
279        self.lru_queue.len()
280    }
281
282    /// Removes all entries from the cache.
283    fn clear(&mut self) {
284        self.lru_queue.clear();
285        self.memory_used = 0;
286    }
287}
288
289impl CacheAccessor<TableScopedPath, CachedFileList> for DefaultListFilesCache {
290    fn get(&self, key: &TableScopedPath) -> Option<CachedFileList> {
291        let mut state = self.state.lock().unwrap();
292        let now = self.time_provider.now();
293        state.get(key, now)
294    }
295
296    fn put(
297        &self,
298        key: &TableScopedPath,
299        value: CachedFileList,
300    ) -> Option<CachedFileList> {
301        let mut state = self.state.lock().unwrap();
302        let now = self.time_provider.now();
303        state.put(key, value, now)
304    }
305
306    fn remove(&self, k: &TableScopedPath) -> Option<CachedFileList> {
307        let mut state = self.state.lock().unwrap();
308        state.remove(k)
309    }
310
311    fn contains_key(&self, k: &TableScopedPath) -> bool {
312        let mut state = self.state.lock().unwrap();
313        let now = self.time_provider.now();
314        state.contains_key(k, now)
315    }
316
317    fn len(&self) -> usize {
318        let state = self.state.lock().unwrap();
319        state.len()
320    }
321
322    fn clear(&self) {
323        let mut state = self.state.lock().unwrap();
324        state.clear();
325    }
326
327    fn name(&self) -> String {
328        String::from("DefaultListFilesCache")
329    }
330}
331
332impl ListFilesCache for DefaultListFilesCache {
333    fn cache_limit(&self) -> usize {
334        let state = self.state.lock().unwrap();
335        state.memory_limit
336    }
337
338    fn cache_ttl(&self) -> Option<Duration> {
339        let state = self.state.lock().unwrap();
340        state.ttl
341    }
342
343    fn update_cache_limit(&self, limit: usize) {
344        let mut state = self.state.lock().unwrap();
345        state.memory_limit = limit;
346        state.evict_entries();
347    }
348
349    fn update_cache_ttl(&self, ttl: Option<Duration>) {
350        let mut state = self.state.lock().unwrap();
351        state.ttl = ttl;
352        state.evict_entries();
353    }
354
355    fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry> {
356        let state = self.state.lock().unwrap();
357        let mut entries = HashMap::<TableScopedPath, ListFilesEntry>::new();
358        for (path, entry) in state.lru_queue.list_entries() {
359            entries.insert(path.clone(), entry.clone());
360        }
361        entries
362    }
363
364    fn drop_table_entries(
365        &self,
366        table_ref: &Option<TableReference>,
367    ) -> datafusion_common::Result<()> {
368        let mut state = self.state.lock().unwrap();
369        let mut table_paths = vec![];
370        for (path, _) in state.lru_queue.list_entries() {
371            if path.table == *table_ref {
372                table_paths.push(path.clone());
373            }
374        }
375        for path in table_paths {
376            state.remove(&path);
377        }
378        Ok(())
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385    use chrono::DateTime;
386    use std::thread;
387
388    struct MockTimeProvider {
389        base: Instant,
390        offset: Mutex<Duration>,
391    }
392
393    impl MockTimeProvider {
394        fn new() -> Self {
395            Self {
396                base: Instant::now(),
397                offset: Mutex::new(Duration::ZERO),
398            }
399        }
400
401        fn inc(&self, duration: Duration) {
402            let mut offset = self.offset.lock().unwrap();
403            *offset += duration;
404        }
405    }
406
407    impl TimeProvider for MockTimeProvider {
408        fn now(&self) -> Instant {
409            self.base + *self.offset.lock().unwrap()
410        }
411    }
412
413    /// Helper function to create a test ObjectMeta with a specific path and location string size
414    fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta {
415        // Create a location string of the desired size by padding with zeros
416        let location_str = if location_size > path.len() {
417            format!("{}{}", path, "0".repeat(location_size - path.len()))
418        } else {
419            path.to_string()
420        };
421
422        ObjectMeta {
423            location: Path::from(location_str),
424            last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
425                .unwrap()
426                .into(),
427            size: 1024,
428            e_tag: None,
429            version: None,
430        }
431    }
432
433    /// Helper function to create a CachedFileList with at least meta_size bytes
434    fn create_test_list_files_entry(
435        path: &str,
436        count: usize,
437        meta_size: usize,
438    ) -> (Path, CachedFileList, usize) {
439        let metas: Vec<ObjectMeta> = (0..count)
440            .map(|i| create_test_object_meta(&format!("file{i}"), meta_size))
441            .collect();
442
443        // Calculate actual size using the same logic as ListFilesEntry::try_new
444        let size = (metas.capacity() * size_of::<ObjectMeta>())
445            + metas.iter().map(meta_heap_bytes).sum::<usize>();
446
447        (Path::from(path), CachedFileList::new(metas), size)
448    }
449
450    #[test]
451    fn test_basic_operations() {
452        let cache = DefaultListFilesCache::default();
453        let table_ref = Some(TableReference::from("table"));
454        let path = Path::from("test_path");
455        let key = TableScopedPath {
456            table: table_ref.clone(),
457            path,
458        };
459
460        // Initially cache is empty
461        assert!(!cache.contains_key(&key));
462        assert_eq!(cache.len(), 0);
463
464        // Cache miss - get returns None
465        assert!(cache.get(&key).is_none());
466
467        // Put a value
468        let meta = create_test_object_meta("file1", 50);
469        cache.put(&key, CachedFileList::new(vec![meta]));
470
471        // Entry should be cached
472        assert!(cache.contains_key(&key));
473        assert_eq!(cache.len(), 1);
474        let result = cache.get(&key).unwrap();
475        assert_eq!(result.files.len(), 1);
476
477        // Remove the entry
478        let removed = cache.remove(&key).unwrap();
479        assert_eq!(removed.files.len(), 1);
480        assert!(!cache.contains_key(&key));
481        assert_eq!(cache.len(), 0);
482
483        // Put multiple entries
484        let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
485        let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50);
486        let key1 = TableScopedPath {
487            table: table_ref.clone(),
488            path: path1,
489        };
490        let key2 = TableScopedPath {
491            table: table_ref,
492            path: path2,
493        };
494        cache.put(&key1, value1.clone());
495        cache.put(&key2, value2.clone());
496        assert_eq!(cache.len(), 2);
497
498        // List cache entries
499        assert_eq!(
500            cache.list_entries(),
501            HashMap::from([
502                (
503                    key1.clone(),
504                    ListFilesEntry {
505                        metas: value1,
506                        size_bytes: size1,
507                        expires: None,
508                    }
509                ),
510                (
511                    key2.clone(),
512                    ListFilesEntry {
513                        metas: value2,
514                        size_bytes: size2,
515                        expires: None,
516                    }
517                )
518            ])
519        );
520
521        // Clear all entries
522        cache.clear();
523        assert_eq!(cache.len(), 0);
524        assert!(!cache.contains_key(&key1));
525        assert!(!cache.contains_key(&key2));
526    }
527
528    #[test]
529    fn test_lru_eviction_basic() {
530        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
531        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
532        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
533
534        // Set cache limit to exactly fit all three entries
535        let cache = DefaultListFilesCache::new(size * 3, None);
536
537        let table_ref = Some(TableReference::from("table"));
538        let key1 = TableScopedPath {
539            table: table_ref.clone(),
540            path: path1,
541        };
542        let key2 = TableScopedPath {
543            table: table_ref.clone(),
544            path: path2,
545        };
546        let key3 = TableScopedPath {
547            table: table_ref.clone(),
548            path: path3,
549        };
550
551        // All three entries should fit
552        cache.put(&key1, value1);
553        cache.put(&key2, value2);
554        cache.put(&key3, value3);
555        assert_eq!(cache.len(), 3);
556        assert!(cache.contains_key(&key1));
557        assert!(cache.contains_key(&key2));
558        assert!(cache.contains_key(&key3));
559
560        // Adding a new entry should evict path1 (LRU)
561        let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
562        let key4 = TableScopedPath {
563            table: table_ref,
564            path: path4,
565        };
566        cache.put(&key4, value4);
567
568        assert_eq!(cache.len(), 3);
569        assert!(!cache.contains_key(&key1)); // Evicted
570        assert!(cache.contains_key(&key2));
571        assert!(cache.contains_key(&key3));
572        assert!(cache.contains_key(&key4));
573    }
574
575    #[test]
576    fn test_lru_ordering_after_access() {
577        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
578        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
579        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
580
581        // Set cache limit to fit exactly three entries
582        let cache = DefaultListFilesCache::new(size * 3, None);
583
584        let table_ref = Some(TableReference::from("table"));
585        let key1 = TableScopedPath {
586            table: table_ref.clone(),
587            path: path1,
588        };
589        let key2 = TableScopedPath {
590            table: table_ref.clone(),
591            path: path2,
592        };
593        let key3 = TableScopedPath {
594            table: table_ref.clone(),
595            path: path3,
596        };
597
598        cache.put(&key1, value1);
599        cache.put(&key2, value2);
600        cache.put(&key3, value3);
601        assert_eq!(cache.len(), 3);
602
603        // Access path1 to move it to front (MRU)
604        // Order is now: path2 (LRU), path3, path1 (MRU)
605        let _ = cache.get(&key1);
606
607        // Adding a new entry should evict path2 (the LRU)
608        let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
609        let key4 = TableScopedPath {
610            table: table_ref,
611            path: path4,
612        };
613        cache.put(&key4, value4);
614
615        assert_eq!(cache.len(), 3);
616        assert!(cache.contains_key(&key1)); // Still present (recently accessed)
617        assert!(!cache.contains_key(&key2)); // Evicted (was LRU)
618        assert!(cache.contains_key(&key3));
619        assert!(cache.contains_key(&key4));
620    }
621
622    #[test]
623    fn test_reject_too_large() {
624        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
625        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
626
627        // Set cache limit to fit both entries
628        let cache = DefaultListFilesCache::new(size * 2, None);
629
630        let table_ref = Some(TableReference::from("table"));
631        let key1 = TableScopedPath {
632            table: table_ref.clone(),
633            path: path1,
634        };
635        let key2 = TableScopedPath {
636            table: table_ref.clone(),
637            path: path2,
638        };
639        cache.put(&key1, value1);
640        cache.put(&key2, value2);
641        assert_eq!(cache.len(), 2);
642
643        // Try to add an entry that's too large to fit in the cache
644        // The entry is not stored (too large)
645        let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000);
646        let key_large = TableScopedPath {
647            table: table_ref,
648            path: path_large,
649        };
650        cache.put(&key_large, value_large);
651
652        // Large entry should not be added
653        assert!(!cache.contains_key(&key_large));
654        assert_eq!(cache.len(), 2);
655        assert!(cache.contains_key(&key1));
656        assert!(cache.contains_key(&key2));
657    }
658
659    #[test]
660    fn test_multiple_evictions() {
661        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
662        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
663        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
664
665        // Set cache limit for exactly 3 entries
666        let cache = DefaultListFilesCache::new(size * 3, None);
667
668        let table_ref = Some(TableReference::from("table"));
669        let key1 = TableScopedPath {
670            table: table_ref.clone(),
671            path: path1,
672        };
673        let key2 = TableScopedPath {
674            table: table_ref.clone(),
675            path: path2,
676        };
677        let key3 = TableScopedPath {
678            table: table_ref.clone(),
679            path: path3,
680        };
681        cache.put(&key1, value1);
682        cache.put(&key2, value2);
683        cache.put(&key3, value3);
684        assert_eq!(cache.len(), 3);
685
686        // Add a large entry that requires evicting 2 entries
687        let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 200);
688        let key_large = TableScopedPath {
689            table: table_ref,
690            path: path_large,
691        };
692        cache.put(&key_large, value_large);
693
694        // path1 and path2 should be evicted (both LRU), path3 and path_large remain
695        assert_eq!(cache.len(), 2);
696        assert!(!cache.contains_key(&key1)); // Evicted
697        assert!(!cache.contains_key(&key2)); // Evicted
698        assert!(cache.contains_key(&key3));
699        assert!(cache.contains_key(&key_large));
700    }
701
702    #[test]
703    fn test_cache_limit_resize() {
704        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
705        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
706        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
707
708        let cache = DefaultListFilesCache::new(size * 3, None);
709
710        let table_ref = Some(TableReference::from("table"));
711        let key1 = TableScopedPath {
712            table: table_ref.clone(),
713            path: path1,
714        };
715        let key2 = TableScopedPath {
716            table: table_ref.clone(),
717            path: path2,
718        };
719        let key3 = TableScopedPath {
720            table: table_ref,
721            path: path3,
722        };
723        // Add three entries
724        cache.put(&key1, value1);
725        cache.put(&key2, value2);
726        cache.put(&key3, value3);
727        assert_eq!(cache.len(), 3);
728
729        // Resize cache to only fit one entry
730        cache.update_cache_limit(size);
731
732        // Should keep only the most recent entry (path3, the MRU)
733        assert_eq!(cache.len(), 1);
734        assert!(cache.contains_key(&key3));
735        // Earlier entries (LRU) should be evicted
736        assert!(!cache.contains_key(&key1));
737        assert!(!cache.contains_key(&key2));
738    }
739
740    #[test]
741    fn test_entry_update_with_size_change() {
742        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
743        let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 100);
744        let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 100);
745
746        let cache = DefaultListFilesCache::new(size * 3, None);
747
748        let table_ref = Some(TableReference::from("table"));
749        let key1 = TableScopedPath {
750            table: table_ref.clone(),
751            path: path1,
752        };
753        let key2 = TableScopedPath {
754            table: table_ref.clone(),
755            path: path2,
756        };
757        let key3 = TableScopedPath {
758            table: table_ref,
759            path: path3,
760        };
761        // Add three entries
762        cache.put(&key1, value1);
763        cache.put(&key2, value2.clone());
764        cache.put(&key3, value3_v1);
765        assert_eq!(cache.len(), 3);
766
767        // Update path3 with same size - should not cause eviction
768        let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100);
769        cache.put(&key3, value3_v2);
770
771        assert_eq!(cache.len(), 3);
772        assert!(cache.contains_key(&key1));
773        assert!(cache.contains_key(&key2));
774        assert!(cache.contains_key(&key3));
775
776        // Update path3 with larger size that requires evicting path1 (LRU)
777        let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200);
778        cache.put(&key3, value3_v3.clone());
779
780        assert_eq!(cache.len(), 2);
781        assert!(!cache.contains_key(&key1)); // Evicted (was LRU)
782        assert!(cache.contains_key(&key2));
783        assert!(cache.contains_key(&key3));
784
785        // List cache entries
786        assert_eq!(
787            cache.list_entries(),
788            HashMap::from([
789                (
790                    key2,
791                    ListFilesEntry {
792                        metas: value2,
793                        size_bytes: size2,
794                        expires: None,
795                    }
796                ),
797                (
798                    key3,
799                    ListFilesEntry {
800                        metas: value3_v3,
801                        size_bytes: size3_v3,
802                        expires: None,
803                    }
804                )
805            ])
806        );
807    }
808
809    #[test]
810    fn test_cache_with_ttl() {
811        let ttl = Duration::from_millis(100);
812
813        let mock_time = Arc::new(MockTimeProvider::new());
814        let cache = DefaultListFilesCache::new(10000, Some(ttl))
815            .with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
816
817        let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
818        let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50);
819
820        let table_ref = Some(TableReference::from("table"));
821        let key1 = TableScopedPath {
822            table: table_ref.clone(),
823            path: path1,
824        };
825        let key2 = TableScopedPath {
826            table: table_ref,
827            path: path2,
828        };
829        cache.put(&key1, value1.clone());
830        cache.put(&key2, value2.clone());
831
832        // Entries should be accessible immediately
833        assert!(cache.get(&key1).is_some());
834        assert!(cache.get(&key2).is_some());
835        // List cache entries
836        assert_eq!(
837            cache.list_entries(),
838            HashMap::from([
839                (
840                    key1.clone(),
841                    ListFilesEntry {
842                        metas: value1,
843                        size_bytes: size1,
844                        expires: mock_time.now().checked_add(ttl),
845                    }
846                ),
847                (
848                    key2.clone(),
849                    ListFilesEntry {
850                        metas: value2,
851                        size_bytes: size2,
852                        expires: mock_time.now().checked_add(ttl),
853                    }
854                )
855            ])
856        );
857        // Wait for TTL to expire
858        mock_time.inc(Duration::from_millis(150));
859
860        // Entries should now return None when observed through contains_key
861        assert!(!cache.contains_key(&key1));
862        assert_eq!(cache.len(), 1); // key1 was removed by contains_key()
863        assert!(!cache.contains_key(&key2));
864        assert_eq!(cache.len(), 0); // key2 was removed by contains_key()
865    }
866
867    #[test]
868    fn test_cache_with_ttl_and_lru() {
869        let ttl = Duration::from_millis(200);
870
871        let mock_time = Arc::new(MockTimeProvider::new());
872        let cache = DefaultListFilesCache::new(1000, Some(ttl))
873            .with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
874
875        let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400);
876        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400);
877        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400);
878
879        let table_ref = Some(TableReference::from("table"));
880        let key1 = TableScopedPath {
881            table: table_ref.clone(),
882            path: path1,
883        };
884        let key2 = TableScopedPath {
885            table: table_ref.clone(),
886            path: path2,
887        };
888        let key3 = TableScopedPath {
889            table: table_ref,
890            path: path3,
891        };
892        cache.put(&key1, value1);
893        mock_time.inc(Duration::from_millis(50));
894        cache.put(&key2, value2);
895        mock_time.inc(Duration::from_millis(50));
896
897        // path3 should evict path1 due to size limit
898        cache.put(&key3, value3);
899        assert!(!cache.contains_key(&key1)); // Evicted by LRU
900        assert!(cache.contains_key(&key2));
901        assert!(cache.contains_key(&key3));
902
903        mock_time.inc(Duration::from_millis(151));
904
905        assert!(!cache.contains_key(&key2)); // Expired
906        assert!(cache.contains_key(&key3)); // Still valid
907    }
908
909    #[test]
910    fn test_ttl_expiration_in_get() {
911        let ttl = Duration::from_millis(100);
912        let cache = DefaultListFilesCache::new(10000, Some(ttl));
913
914        let (path, value, _) = create_test_list_files_entry("path", 2, 50);
915        let table_ref = Some(TableReference::from("table"));
916        let key = TableScopedPath {
917            table: table_ref,
918            path,
919        };
920
921        // Cache the entry
922        cache.put(&key, value.clone());
923
924        // Entry should be accessible immediately
925        let result = cache.get(&key);
926        assert!(result.is_some());
927        assert_eq!(result.unwrap().files.len(), 2);
928
929        // Wait for TTL to expire
930        thread::sleep(Duration::from_millis(150));
931
932        // Get should return None because entry expired
933        let result2 = cache.get(&key);
934        assert!(result2.is_none());
935    }
936
937    #[test]
938    fn test_meta_heap_bytes_calculation() {
939        // Test with minimal ObjectMeta (no e_tag, no version)
940        let meta1 = ObjectMeta {
941            location: Path::from("test"),
942            last_modified: chrono::Utc::now(),
943            size: 100,
944            e_tag: None,
945            version: None,
946        };
947        assert_eq!(meta_heap_bytes(&meta1), 4); // Just the location string "test"
948
949        // Test with e_tag
950        let meta2 = ObjectMeta {
951            location: Path::from("test"),
952            last_modified: chrono::Utc::now(),
953            size: 100,
954            e_tag: Some("etag123".to_string()),
955            version: None,
956        };
957        assert_eq!(meta_heap_bytes(&meta2), 4 + 7); // location (4) + e_tag (7)
958
959        // Test with version
960        let meta3 = ObjectMeta {
961            location: Path::from("test"),
962            last_modified: chrono::Utc::now(),
963            size: 100,
964            e_tag: None,
965            version: Some("v1.0".to_string()),
966        };
967        assert_eq!(meta_heap_bytes(&meta3), 4 + 4); // location (4) + version (4)
968
969        // Test with both e_tag and version
970        let meta4 = ObjectMeta {
971            location: Path::from("test"),
972            last_modified: chrono::Utc::now(),
973            size: 100,
974            e_tag: Some("tag".to_string()),
975            version: Some("ver".to_string()),
976        };
977        assert_eq!(meta_heap_bytes(&meta4), 4 + 3 + 3); // location (4) + e_tag (3) + version (3)
978    }
979
980    #[test]
981    fn test_entry_creation() {
982        // Test with empty vector
983        let empty_list = CachedFileList::new(vec![]);
984        let now = Instant::now();
985        let entry = ListFilesEntry::try_new(empty_list, None, now);
986        assert!(entry.is_none());
987
988        // Validate entry size
989        let metas: Vec<ObjectMeta> = (0..5)
990            .map(|i| create_test_object_meta(&format!("file{i}"), 30))
991            .collect();
992        let cached_list = CachedFileList::new(metas);
993        let entry = ListFilesEntry::try_new(cached_list, None, now).unwrap();
994        assert_eq!(entry.metas.files.len(), 5);
995        // Size should be: capacity * sizeof(ObjectMeta) + (5 * 30) for heap bytes
996        let expected_size = (entry.metas.files.capacity() * size_of::<ObjectMeta>())
997            + (entry.metas.files.len() * 30);
998        assert_eq!(entry.size_bytes, expected_size);
999
1000        // Test with TTL
1001        let meta = create_test_object_meta("file", 50);
1002        let ttl = Duration::from_secs(10);
1003        let cached_list = CachedFileList::new(vec![meta]);
1004        let entry = ListFilesEntry::try_new(cached_list, Some(ttl), now).unwrap();
1005        assert!(entry.expires.unwrap() > now);
1006    }
1007
1008    #[test]
1009    fn test_memory_tracking() {
1010        let cache = DefaultListFilesCache::new(1000, None);
1011
1012        // Verify cache starts with 0 memory used
1013        {
1014            let state = cache.state.lock().unwrap();
1015            assert_eq!(state.memory_used, 0);
1016        }
1017
1018        // Add entry and verify memory tracking
1019        let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 100);
1020        let table_ref = Some(TableReference::from("table"));
1021        let key1 = TableScopedPath {
1022            table: table_ref.clone(),
1023            path: path1,
1024        };
1025        cache.put(&key1, value1);
1026        {
1027            let state = cache.state.lock().unwrap();
1028            assert_eq!(state.memory_used, size1);
1029        }
1030
1031        // Add another entry
1032        let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 200);
1033        let key2 = TableScopedPath {
1034            table: table_ref.clone(),
1035            path: path2,
1036        };
1037        cache.put(&key2, value2);
1038        {
1039            let state = cache.state.lock().unwrap();
1040            assert_eq!(state.memory_used, size1 + size2);
1041        }
1042
1043        // Remove first entry and verify memory decreases
1044        cache.remove(&key1);
1045        {
1046            let state = cache.state.lock().unwrap();
1047            assert_eq!(state.memory_used, size2);
1048        }
1049
1050        // Clear and verify memory is 0
1051        cache.clear();
1052        {
1053            let state = cache.state.lock().unwrap();
1054            assert_eq!(state.memory_used, 0);
1055        }
1056    }
1057
1058    // Prefix filtering tests using CachedFileList::filter_by_prefix
1059
1060    /// Helper function to create ObjectMeta with a specific location path
1061    fn create_object_meta_with_path(location: &str) -> ObjectMeta {
1062        ObjectMeta {
1063            location: Path::from(location),
1064            last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
1065                .unwrap()
1066                .into(),
1067            size: 1024,
1068            e_tag: None,
1069            version: None,
1070        }
1071    }
1072
1073    #[test]
1074    fn test_prefix_filtering() {
1075        let cache = DefaultListFilesCache::new(100000, None);
1076
1077        // Create files for a partitioned table
1078        let table_base = Path::from("my_table");
1079        let files = vec![
1080            create_object_meta_with_path("my_table/a=1/file1.parquet"),
1081            create_object_meta_with_path("my_table/a=1/file2.parquet"),
1082            create_object_meta_with_path("my_table/a=2/file3.parquet"),
1083            create_object_meta_with_path("my_table/a=2/file4.parquet"),
1084        ];
1085
1086        // Cache the full table listing
1087        let table_ref = Some(TableReference::from("table"));
1088        let key = TableScopedPath {
1089            table: table_ref,
1090            path: table_base,
1091        };
1092        cache.put(&key, CachedFileList::new(files));
1093
1094        let result = cache.get(&key).unwrap();
1095
1096        // Filter for partition a=1
1097        let prefix_a1 = Some(Path::from("my_table/a=1"));
1098        let filtered = result.files_matching_prefix(&prefix_a1);
1099        assert_eq!(filtered.len(), 2);
1100        assert!(
1101            filtered
1102                .iter()
1103                .all(|m| m.location.as_ref().starts_with("my_table/a=1"))
1104        );
1105
1106        // Filter for partition a=2
1107        let prefix_a2 = Some(Path::from("my_table/a=2"));
1108        let filtered_2 = result.files_matching_prefix(&prefix_a2);
1109        assert_eq!(filtered_2.len(), 2);
1110        assert!(
1111            filtered_2
1112                .iter()
1113                .all(|m| m.location.as_ref().starts_with("my_table/a=2"))
1114        );
1115
1116        // No filter returns all
1117        let all = result.files_matching_prefix(&None);
1118        assert_eq!(all.len(), 4);
1119    }
1120
1121    #[test]
1122    fn test_prefix_no_matching_files() {
1123        let cache = DefaultListFilesCache::new(100000, None);
1124
1125        let table_base = Path::from("my_table");
1126        let files = vec![
1127            create_object_meta_with_path("my_table/a=1/file1.parquet"),
1128            create_object_meta_with_path("my_table/a=2/file2.parquet"),
1129        ];
1130
1131        let table_ref = Some(TableReference::from("table"));
1132        let key = TableScopedPath {
1133            table: table_ref,
1134            path: table_base,
1135        };
1136        cache.put(&key, CachedFileList::new(files));
1137        let result = cache.get(&key).unwrap();
1138
1139        // Query for partition a=3 which doesn't exist
1140        let prefix_a3 = Some(Path::from("my_table/a=3"));
1141        let filtered = result.files_matching_prefix(&prefix_a3);
1142        assert!(filtered.is_empty());
1143    }
1144
1145    #[test]
1146    fn test_nested_partitions() {
1147        let cache = DefaultListFilesCache::new(100000, None);
1148
1149        let table_base = Path::from("events");
1150        let files = vec![
1151            create_object_meta_with_path(
1152                "events/year=2024/month=01/day=01/file1.parquet",
1153            ),
1154            create_object_meta_with_path(
1155                "events/year=2024/month=01/day=02/file2.parquet",
1156            ),
1157            create_object_meta_with_path(
1158                "events/year=2024/month=02/day=01/file3.parquet",
1159            ),
1160            create_object_meta_with_path(
1161                "events/year=2025/month=01/day=01/file4.parquet",
1162            ),
1163        ];
1164
1165        let table_ref = Some(TableReference::from("table"));
1166        let key = TableScopedPath {
1167            table: table_ref,
1168            path: table_base,
1169        };
1170        cache.put(&key, CachedFileList::new(files));
1171        let result = cache.get(&key).unwrap();
1172
1173        // Filter for year=2024/month=01
1174        let prefix_month = Some(Path::from("events/year=2024/month=01"));
1175        let filtered = result.files_matching_prefix(&prefix_month);
1176        assert_eq!(filtered.len(), 2);
1177
1178        // Filter for year=2024
1179        let prefix_year = Some(Path::from("events/year=2024"));
1180        let filtered_year = result.files_matching_prefix(&prefix_year);
1181        assert_eq!(filtered_year.len(), 3);
1182    }
1183
1184    #[test]
1185    fn test_drop_table_entries() {
1186        let cache = DefaultListFilesCache::default();
1187
1188        let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100);
1189        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
1190        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
1191
1192        let table_ref1 = Some(TableReference::from("table1"));
1193        let key1 = TableScopedPath {
1194            table: table_ref1.clone(),
1195            path: path1,
1196        };
1197        let key2 = TableScopedPath {
1198            table: table_ref1.clone(),
1199            path: path2,
1200        };
1201
1202        let table_ref2 = Some(TableReference::from("table2"));
1203        let key3 = TableScopedPath {
1204            table: table_ref2.clone(),
1205            path: path3,
1206        };
1207
1208        cache.put(&key1, value1);
1209        cache.put(&key2, value2);
1210        cache.put(&key3, value3);
1211
1212        cache.drop_table_entries(&table_ref1).unwrap();
1213
1214        assert!(!cache.contains_key(&key1));
1215        assert!(!cache.contains_key(&key2));
1216        assert!(cache.contains_key(&key3));
1217    }
1218}