datafusion_execution/cache/
list_files_cache.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::mem::size_of;
19use std::{
20    collections::HashMap,
21    sync::{Arc, Mutex},
22    time::Duration,
23};
24
25use datafusion_common::TableReference;
26use datafusion_common::instant::Instant;
27use object_store::{ObjectMeta, path::Path};
28
29use crate::cache::{CacheAccessor, cache_manager::ListFilesCache, lru_queue::LruQueue};
30
31pub trait TimeProvider: Send + Sync + 'static {
32    fn now(&self) -> Instant;
33}
34
35#[derive(Debug, Default)]
36pub struct SystemTimeProvider;
37
38impl TimeProvider for SystemTimeProvider {
39    fn now(&self) -> Instant {
40        Instant::now()
41    }
42}
43
44/// Default implementation of [`ListFilesCache`]
45///
46/// Caches file metadata for file listing operations.
47///
48/// # Internal details
49///
50/// The `memory_limit` parameter controls the maximum size of the cache, which uses a Least
51/// Recently Used eviction algorithm. When adding a new entry, if the total number of entries in
52/// the cache exceeds `memory_limit`, the least recently used entries are evicted until the total
53/// size is lower than the `memory_limit`.
54///
55/// # `Extra` Handling
56///
57/// Users should use the [`Self::get`] and [`Self::put`] methods. The
58/// [`Self::get_with_extra`] and [`Self::put_with_extra`] methods simply call
59/// `get` and `put`, respectively.
60pub struct DefaultListFilesCache {
61    state: Mutex<DefaultListFilesCacheState>,
62    time_provider: Arc<dyn TimeProvider>,
63}
64
65impl Default for DefaultListFilesCache {
66    fn default() -> Self {
67        Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None)
68    }
69}
70
71impl DefaultListFilesCache {
72    /// Creates a new instance of [`DefaultListFilesCache`].
73    ///
74    /// # Arguments
75    /// * `memory_limit` - The maximum size of the cache, in bytes.
76    /// * `ttl` - The TTL (time-to-live) of entries in the cache.
77    pub fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
78        Self {
79            state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)),
80            time_provider: Arc::new(SystemTimeProvider),
81        }
82    }
83
84    #[cfg(test)]
85    pub(crate) fn with_time_provider(mut self, provider: Arc<dyn TimeProvider>) -> Self {
86        self.time_provider = provider;
87        self
88    }
89
90    /// Returns the cache's memory limit in bytes.
91    pub fn cache_limit(&self) -> usize {
92        self.state.lock().unwrap().memory_limit
93    }
94
95    /// Updates the cache with a new memory limit in bytes.
96    pub fn update_cache_limit(&self, limit: usize) {
97        let mut state = self.state.lock().unwrap();
98        state.memory_limit = limit;
99        state.evict_entries();
100    }
101
102    /// Returns the TTL (time-to-live) applied to cache entries.
103    pub fn cache_ttl(&self) -> Option<Duration> {
104        self.state.lock().unwrap().ttl
105    }
106}
107
108#[derive(Clone, PartialEq, Debug)]
109pub struct ListFilesEntry {
110    pub metas: Arc<Vec<ObjectMeta>>,
111    pub size_bytes: usize,
112    pub expires: Option<Instant>,
113}
114
115impl ListFilesEntry {
116    fn try_new(
117        metas: Arc<Vec<ObjectMeta>>,
118        ttl: Option<Duration>,
119        now: Instant,
120    ) -> Option<Self> {
121        let size_bytes = (metas.capacity() * size_of::<ObjectMeta>())
122            + metas.iter().map(meta_heap_bytes).reduce(|acc, b| acc + b)?;
123
124        Some(Self {
125            metas,
126            size_bytes,
127            expires: ttl.map(|t| now + t),
128        })
129    }
130}
131
132/// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap.
133fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {
134    let mut size = object_meta.location.as_ref().len();
135
136    if let Some(e) = &object_meta.e_tag {
137        size += e.len();
138    }
139    if let Some(v) = &object_meta.version {
140        size += v.len();
141    }
142
143    size
144}
145
146/// The default memory limit for the [`DefaultListFilesCache`]
147pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB
148
149/// The default cache TTL for the [`DefaultListFilesCache`]
150pub const DEFAULT_LIST_FILES_CACHE_TTL: Option<Duration> = None; // Infinite
151
152#[derive(PartialEq, Eq, Hash, Clone, Debug)]
153pub struct TableScopedPath {
154    pub table: Option<TableReference>,
155    pub path: Path,
156}
157
158/// Handles the inner state of the [`DefaultListFilesCache`] struct.
159pub struct DefaultListFilesCacheState {
160    lru_queue: LruQueue<TableScopedPath, ListFilesEntry>,
161    memory_limit: usize,
162    memory_used: usize,
163    ttl: Option<Duration>,
164}
165
166impl Default for DefaultListFilesCacheState {
167    fn default() -> Self {
168        Self {
169            lru_queue: LruQueue::new(),
170            memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
171            memory_used: 0,
172            ttl: DEFAULT_LIST_FILES_CACHE_TTL,
173        }
174    }
175}
176
177impl DefaultListFilesCacheState {
178    fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
179        Self {
180            lru_queue: LruQueue::new(),
181            memory_limit,
182            memory_used: 0,
183            ttl,
184        }
185    }
186
187    /// Performs a prefix-aware cache lookup.
188    ///
189    /// # Arguments
190    /// * `table_base` - The table's base path (the cache key)
191    /// * `prefix` - Optional prefix filter relative to the table base path
192    /// * `now` - Current time for expiration checking
193    ///
194    /// # Behavior
195    /// - Fetches the cache entry for `table_base`
196    /// - If `prefix` is `Some`, filters results to only files matching `table_base/prefix`
197    /// - Returns the (potentially filtered) results
198    ///
199    /// # Example
200    /// ```text
201    /// get_with_prefix("my_table", Some("a=1"), now)
202    ///   → Fetch cache entry for "my_table"
203    ///   → Filter to files matching "my_table/a=1/*"
204    ///   → Return filtered results
205    /// ```
206    fn get_with_prefix(
207        &mut self,
208        table_scoped_base_path: &TableScopedPath,
209        prefix: Option<&Path>,
210        now: Instant,
211    ) -> Option<Arc<Vec<ObjectMeta>>> {
212        let entry = self.lru_queue.get(table_scoped_base_path)?;
213
214        // Check expiration
215        if let Some(exp) = entry.expires
216            && now > exp
217        {
218            self.remove(table_scoped_base_path);
219            return None;
220        }
221
222        // Early return if no prefix filter - return all files
223        let Some(prefix) = prefix else {
224            return Some(Arc::clone(&entry.metas));
225        };
226
227        // Build the full prefix path: table_base/prefix
228        let table_base = &table_scoped_base_path.path;
229        let mut parts: Vec<_> = table_base.parts().collect();
230        parts.extend(prefix.parts());
231        let full_prefix = Path::from_iter(parts);
232        let full_prefix_str = full_prefix.as_ref();
233
234        // Filter files to only those matching the prefix
235        let filtered: Vec<ObjectMeta> = entry
236            .metas
237            .iter()
238            .filter(|meta| meta.location.as_ref().starts_with(full_prefix_str))
239            .cloned()
240            .collect();
241
242        if filtered.is_empty() {
243            None
244        } else {
245            Some(Arc::new(filtered))
246        }
247    }
248
249    /// Checks if the respective entry is currently cached.
250    ///
251    /// If the entry has expired by `now` it is removed from the cache.
252    ///
253    /// The LRU queue is not updated.
254    fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool {
255        let Some(entry) = self.lru_queue.peek(k) else {
256            return false;
257        };
258
259        match entry.expires {
260            Some(exp) if now > exp => {
261                self.remove(k);
262                false
263            }
264            _ => true,
265        }
266    }
267
268    /// Adds a new key-value pair to cache expiring at `now` + the TTL.
269    ///
270    /// This means that LRU entries might be evicted if required.
271    /// If the key is already in the cache, the previous entry is returned.
272    /// If the size of the entry is greater than the `memory_limit`, the value is not inserted.
273    fn put(
274        &mut self,
275        key: &TableScopedPath,
276        value: Arc<Vec<ObjectMeta>>,
277        now: Instant,
278    ) -> Option<Arc<Vec<ObjectMeta>>> {
279        let entry = ListFilesEntry::try_new(value, self.ttl, now)?;
280        let entry_size = entry.size_bytes;
281
282        // no point in trying to add this value to the cache if it cannot fit entirely
283        if entry_size > self.memory_limit {
284            return None;
285        }
286
287        // if the key is already in the cache, the old value is removed
288        let old_value = self.lru_queue.put(key.clone(), entry);
289        self.memory_used += entry_size;
290
291        if let Some(entry) = &old_value {
292            self.memory_used -= entry.size_bytes;
293        }
294
295        self.evict_entries();
296
297        old_value.map(|v| v.metas)
298    }
299
300    /// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`.
301    fn evict_entries(&mut self) {
302        while self.memory_used > self.memory_limit {
303            if let Some(removed) = self.lru_queue.pop() {
304                self.memory_used -= removed.1.size_bytes;
305            } else {
306                // cache is empty while memory_used > memory_limit, cannot happen
307                debug_assert!(
308                    false,
309                    "cache is empty while memory_used > memory_limit, cannot happen"
310                );
311                return;
312            }
313        }
314    }
315
316    /// Removes an entry from the cache and returns it, if it exists.
317    fn remove(&mut self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
318        if let Some(entry) = self.lru_queue.remove(k) {
319            self.memory_used -= entry.size_bytes;
320            Some(entry.metas)
321        } else {
322            None
323        }
324    }
325
326    /// Returns the number of entries currently cached.
327    fn len(&self) -> usize {
328        self.lru_queue.len()
329    }
330
331    /// Removes all entries from the cache.
332    fn clear(&mut self) {
333        self.lru_queue.clear();
334        self.memory_used = 0;
335    }
336}
337
338impl ListFilesCache for DefaultListFilesCache {
339    fn cache_limit(&self) -> usize {
340        let state = self.state.lock().unwrap();
341        state.memory_limit
342    }
343
344    fn cache_ttl(&self) -> Option<Duration> {
345        let state = self.state.lock().unwrap();
346        state.ttl
347    }
348
349    fn update_cache_limit(&self, limit: usize) {
350        let mut state = self.state.lock().unwrap();
351        state.memory_limit = limit;
352        state.evict_entries();
353    }
354
355    fn update_cache_ttl(&self, ttl: Option<Duration>) {
356        let mut state = self.state.lock().unwrap();
357        state.ttl = ttl;
358        state.evict_entries();
359    }
360
361    fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry> {
362        let state = self.state.lock().unwrap();
363        let mut entries = HashMap::<TableScopedPath, ListFilesEntry>::new();
364        for (path, entry) in state.lru_queue.list_entries() {
365            entries.insert(path.clone(), entry.clone());
366        }
367        entries
368    }
369
370    fn drop_table_entries(
371        &self,
372        table_ref: &Option<TableReference>,
373    ) -> datafusion_common::Result<()> {
374        let mut state = self.state.lock().unwrap();
375        let mut table_paths = vec![];
376        for (path, _) in state.lru_queue.list_entries() {
377            if path.table == *table_ref {
378                table_paths.push(path.clone());
379            }
380        }
381        for path in table_paths {
382            state.remove(&path);
383        }
384        Ok(())
385    }
386}
387
388impl CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
389    type Extra = Option<Path>;
390
391    /// Gets all files for the given table base path.
392    ///
393    /// This is equivalent to calling `get_with_extra(k, &None)`.
394    fn get(&self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
395        self.get_with_extra(k, &None)
396    }
397
398    /// Performs a prefix-aware cache lookup.
399    ///
400    /// # Arguments
401    /// * `table_base` - The table's base path (the cache key)
402    /// * `prefix` - Optional prefix filter (relative to table base) for partition filtering
403    ///
404    /// # Behavior
405    /// - Fetches the cache entry for `table_base`
406    /// - If `prefix` is `Some`, filters results to only files matching `table_base/prefix`
407    /// - Returns the (potentially filtered) results
408    ///
409    /// This enables efficient partition pruning - a single cached listing of the full table
410    /// can serve queries for any partition subset without additional storage calls.
411    fn get_with_extra(
412        &self,
413        table_scoped_path: &TableScopedPath,
414        prefix: &Self::Extra,
415    ) -> Option<Arc<Vec<ObjectMeta>>> {
416        let mut state = self.state.lock().unwrap();
417        let now = self.time_provider.now();
418        state.get_with_prefix(table_scoped_path, prefix.as_ref(), now)
419    }
420
421    fn put(
422        &self,
423        key: &TableScopedPath,
424        value: Arc<Vec<ObjectMeta>>,
425    ) -> Option<Arc<Vec<ObjectMeta>>> {
426        let mut state = self.state.lock().unwrap();
427        let now = self.time_provider.now();
428        state.put(key, value, now)
429    }
430
431    fn put_with_extra(
432        &self,
433        key: &TableScopedPath,
434        value: Arc<Vec<ObjectMeta>>,
435        _e: &Self::Extra,
436    ) -> Option<Arc<Vec<ObjectMeta>>> {
437        self.put(key, value)
438    }
439
440    fn remove(&self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
441        let mut state = self.state.lock().unwrap();
442        state.remove(k)
443    }
444
445    fn contains_key(&self, k: &TableScopedPath) -> bool {
446        let mut state = self.state.lock().unwrap();
447        let now = self.time_provider.now();
448        state.contains_key(k, now)
449    }
450
451    fn len(&self) -> usize {
452        let state = self.state.lock().unwrap();
453        state.len()
454    }
455
456    fn clear(&self) {
457        let mut state = self.state.lock().unwrap();
458        state.clear();
459    }
460
461    fn name(&self) -> String {
462        String::from("DefaultListFilesCache")
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469    use chrono::DateTime;
470
471    struct MockTimeProvider {
472        base: Instant,
473        offset: Mutex<Duration>,
474    }
475
476    impl MockTimeProvider {
477        fn new() -> Self {
478            Self {
479                base: Instant::now(),
480                offset: Mutex::new(Duration::ZERO),
481            }
482        }
483
484        fn inc(&self, duration: Duration) {
485            let mut offset = self.offset.lock().unwrap();
486            *offset += duration;
487        }
488    }
489
490    impl TimeProvider for MockTimeProvider {
491        fn now(&self) -> Instant {
492            self.base + *self.offset.lock().unwrap()
493        }
494    }
495
496    /// Helper function to create a test ObjectMeta with a specific path and location string size
497    fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta {
498        // Create a location string of the desired size by padding with zeros
499        let location_str = if location_size > path.len() {
500            format!("{}{}", path, "0".repeat(location_size - path.len()))
501        } else {
502            path.to_string()
503        };
504
505        ObjectMeta {
506            location: Path::from(location_str),
507            last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
508                .unwrap()
509                .into(),
510            size: 1024,
511            e_tag: None,
512            version: None,
513        }
514    }
515
516    /// Helper function to create a vector of ObjectMeta with at least meta_size bytes
517    fn create_test_list_files_entry(
518        path: &str,
519        count: usize,
520        meta_size: usize,
521    ) -> (Path, Arc<Vec<ObjectMeta>>, usize) {
522        let metas: Vec<ObjectMeta> = (0..count)
523            .map(|i| create_test_object_meta(&format!("file{i}"), meta_size))
524            .collect();
525        let metas = Arc::new(metas);
526
527        // Calculate actual size using the same logic as ListFilesEntry::try_new
528        let size = (metas.capacity() * size_of::<ObjectMeta>())
529            + metas.iter().map(meta_heap_bytes).sum::<usize>();
530
531        (Path::from(path), metas, size)
532    }
533
534    #[test]
535    fn test_basic_operations() {
536        let cache = DefaultListFilesCache::default();
537        let table_ref = Some(TableReference::from("table"));
538        let path = Path::from("test_path");
539        let key = TableScopedPath {
540            table: table_ref.clone(),
541            path,
542        };
543
544        // Initially cache is empty
545        assert!(cache.get(&key).is_none());
546        assert!(!cache.contains_key(&key));
547        assert_eq!(cache.len(), 0);
548
549        // Put an entry
550        let meta = create_test_object_meta("file1", 50);
551        let value = Arc::new(vec![meta.clone()]);
552        cache.put(&key, Arc::clone(&value));
553
554        // Entry should be retrievable
555        assert!(cache.contains_key(&key));
556        assert_eq!(cache.len(), 1);
557        let retrieved = cache.get(&key).unwrap();
558        assert_eq!(retrieved.len(), 1);
559        assert_eq!(retrieved[0].location, meta.location);
560
561        // Remove the entry
562        let removed = cache.remove(&key).unwrap();
563        assert_eq!(removed.len(), 1);
564        assert!(!cache.contains_key(&key));
565        assert_eq!(cache.len(), 0);
566
567        // Put multiple entries
568        let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
569        let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50);
570        let key1 = TableScopedPath {
571            table: table_ref.clone(),
572            path: path1,
573        };
574        let key2 = TableScopedPath {
575            table: table_ref,
576            path: path2,
577        };
578        cache.put(&key1, Arc::clone(&value1));
579        cache.put(&key2, Arc::clone(&value2));
580        assert_eq!(cache.len(), 2);
581
582        // List cache entries
583        assert_eq!(
584            cache.list_entries(),
585            HashMap::from([
586                (
587                    key1.clone(),
588                    ListFilesEntry {
589                        metas: value1,
590                        size_bytes: size1,
591                        expires: None,
592                    }
593                ),
594                (
595                    key2.clone(),
596                    ListFilesEntry {
597                        metas: value2,
598                        size_bytes: size2,
599                        expires: None,
600                    }
601                )
602            ])
603        );
604
605        // Clear all entries
606        cache.clear();
607        assert_eq!(cache.len(), 0);
608        assert!(!cache.contains_key(&key1));
609        assert!(!cache.contains_key(&key2));
610    }
611
612    #[test]
613    fn test_lru_eviction_basic() {
614        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
615        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
616        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
617
618        // Set cache limit to exactly fit all three entries
619        let cache = DefaultListFilesCache::new(size * 3, None);
620
621        let table_ref = Some(TableReference::from("table"));
622        let key1 = TableScopedPath {
623            table: table_ref.clone(),
624            path: path1,
625        };
626        let key2 = TableScopedPath {
627            table: table_ref.clone(),
628            path: path2,
629        };
630        let key3 = TableScopedPath {
631            table: table_ref.clone(),
632            path: path3,
633        };
634
635        // All three entries should fit
636        cache.put(&key1, value1);
637        cache.put(&key2, value2);
638        cache.put(&key3, value3);
639        assert_eq!(cache.len(), 3);
640        assert!(cache.contains_key(&key1));
641        assert!(cache.contains_key(&key2));
642        assert!(cache.contains_key(&key3));
643
644        // Adding a new entry should evict path1 (LRU)
645        let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
646        let key4 = TableScopedPath {
647            table: table_ref,
648            path: path4,
649        };
650        cache.put(&key4, value4);
651
652        assert_eq!(cache.len(), 3);
653        assert!(!cache.contains_key(&key1)); // Evicted
654        assert!(cache.contains_key(&key2));
655        assert!(cache.contains_key(&key3));
656        assert!(cache.contains_key(&key4));
657    }
658
659    #[test]
660    fn test_lru_ordering_after_access() {
661        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
662        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
663        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
664
665        // Set cache limit to fit exactly three entries
666        let cache = DefaultListFilesCache::new(size * 3, None);
667
668        let table_ref = Some(TableReference::from("table"));
669        let key1 = TableScopedPath {
670            table: table_ref.clone(),
671            path: path1,
672        };
673        let key2 = TableScopedPath {
674            table: table_ref.clone(),
675            path: path2,
676        };
677        let key3 = TableScopedPath {
678            table: table_ref.clone(),
679            path: path3,
680        };
681
682        cache.put(&key1, value1);
683        cache.put(&key2, value2);
684        cache.put(&key3, value3);
685        assert_eq!(cache.len(), 3);
686
687        // Access path1 to move it to front (MRU)
688        // Order is now: path2 (LRU), path3, path1 (MRU)
689        cache.get(&key1);
690
691        // Adding a new entry should evict path2 (the LRU)
692        let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
693        let key4 = TableScopedPath {
694            table: table_ref,
695            path: path4,
696        };
697        cache.put(&key4, value4);
698
699        assert_eq!(cache.len(), 3);
700        assert!(cache.contains_key(&key1)); // Still present (recently accessed)
701        assert!(!cache.contains_key(&key2)); // Evicted (was LRU)
702        assert!(cache.contains_key(&key3));
703        assert!(cache.contains_key(&key4));
704    }
705
706    #[test]
707    fn test_reject_too_large() {
708        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
709        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
710
711        // Set cache limit to fit both entries
712        let cache = DefaultListFilesCache::new(size * 2, None);
713
714        let table_ref = Some(TableReference::from("table"));
715        let key1 = TableScopedPath {
716            table: table_ref.clone(),
717            path: path1,
718        };
719        let key2 = TableScopedPath {
720            table: table_ref.clone(),
721            path: path2,
722        };
723        cache.put(&key1, value1);
724        cache.put(&key2, value2);
725        assert_eq!(cache.len(), 2);
726
727        // Try to add an entry that's too large to fit in the cache
728        let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000);
729        let key_large = TableScopedPath {
730            table: table_ref,
731            path: path_large,
732        };
733        cache.put(&key_large, value_large);
734
735        // Large entry should not be added
736        assert!(!cache.contains_key(&key_large));
737        assert_eq!(cache.len(), 2);
738        assert!(cache.contains_key(&key1));
739        assert!(cache.contains_key(&key2));
740    }
741
742    #[test]
743    fn test_multiple_evictions() {
744        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
745        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
746        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
747
748        // Set cache limit for exactly 3 entries
749        let cache = DefaultListFilesCache::new(size * 3, None);
750
751        let table_ref = Some(TableReference::from("table"));
752        let key1 = TableScopedPath {
753            table: table_ref.clone(),
754            path: path1,
755        };
756        let key2 = TableScopedPath {
757            table: table_ref.clone(),
758            path: path2,
759        };
760        let key3 = TableScopedPath {
761            table: table_ref.clone(),
762            path: path3,
763        };
764        cache.put(&key1, value1);
765        cache.put(&key2, value2);
766        cache.put(&key3, value3);
767        assert_eq!(cache.len(), 3);
768
769        // Add a large entry that requires evicting 2 entries
770        let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 200);
771        let key_large = TableScopedPath {
772            table: table_ref,
773            path: path_large,
774        };
775        cache.put(&key_large, value_large);
776
777        // path1 and path2 should be evicted (both LRU), path3 and path_large remain
778        assert_eq!(cache.len(), 2);
779        assert!(!cache.contains_key(&key1)); // Evicted
780        assert!(!cache.contains_key(&key2)); // Evicted
781        assert!(cache.contains_key(&key3));
782        assert!(cache.contains_key(&key_large));
783    }
784
785    #[test]
786    fn test_cache_limit_resize() {
787        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
788        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
789        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
790
791        let cache = DefaultListFilesCache::new(size * 3, None);
792
793        let table_ref = Some(TableReference::from("table"));
794        let key1 = TableScopedPath {
795            table: table_ref.clone(),
796            path: path1,
797        };
798        let key2 = TableScopedPath {
799            table: table_ref.clone(),
800            path: path2,
801        };
802        let key3 = TableScopedPath {
803            table: table_ref,
804            path: path3,
805        };
806        // Add three entries
807        cache.put(&key1, value1);
808        cache.put(&key2, value2);
809        cache.put(&key3, value3);
810        assert_eq!(cache.len(), 3);
811
812        // Resize cache to only fit one entry
813        cache.update_cache_limit(size);
814
815        // Should keep only the most recent entry (path3, the MRU)
816        assert_eq!(cache.len(), 1);
817        assert!(cache.contains_key(&key3));
818        // Earlier entries (LRU) should be evicted
819        assert!(!cache.contains_key(&key1));
820        assert!(!cache.contains_key(&key2));
821    }
822
823    #[test]
824    fn test_entry_update_with_size_change() {
825        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
826        let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 100);
827        let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 100);
828
829        let cache = DefaultListFilesCache::new(size * 3, None);
830
831        let table_ref = Some(TableReference::from("table"));
832        let key1 = TableScopedPath {
833            table: table_ref.clone(),
834            path: path1,
835        };
836        let key2 = TableScopedPath {
837            table: table_ref.clone(),
838            path: path2,
839        };
840        let key3 = TableScopedPath {
841            table: table_ref,
842            path: path3,
843        };
844        // Add three entries
845        cache.put(&key1, value1);
846        cache.put(&key2, Arc::clone(&value2));
847        cache.put(&key3, value3_v1);
848        assert_eq!(cache.len(), 3);
849
850        // Update path3 with same size - should not cause eviction
851        let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100);
852        cache.put(&key3, value3_v2);
853
854        assert_eq!(cache.len(), 3);
855        assert!(cache.contains_key(&key1));
856        assert!(cache.contains_key(&key2));
857        assert!(cache.contains_key(&key3));
858
859        // Update path3 with larger size that requires evicting path1 (LRU)
860        let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200);
861        cache.put(&key3, Arc::clone(&value3_v3));
862
863        assert_eq!(cache.len(), 2);
864        assert!(!cache.contains_key(&key1)); // Evicted (was LRU)
865        assert!(cache.contains_key(&key2));
866        assert!(cache.contains_key(&key3));
867
868        // List cache entries
869        assert_eq!(
870            cache.list_entries(),
871            HashMap::from([
872                (
873                    key2,
874                    ListFilesEntry {
875                        metas: value2,
876                        size_bytes: size2,
877                        expires: None,
878                    }
879                ),
880                (
881                    key3,
882                    ListFilesEntry {
883                        metas: value3_v3,
884                        size_bytes: size3_v3,
885                        expires: None,
886                    }
887                )
888            ])
889        );
890    }
891
892    #[test]
893    fn test_cache_with_ttl() {
894        let ttl = Duration::from_millis(100);
895
896        let mock_time = Arc::new(MockTimeProvider::new());
897        let cache = DefaultListFilesCache::new(10000, Some(ttl))
898            .with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
899
900        let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
901        let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50);
902
903        let table_ref = Some(TableReference::from("table"));
904        let key1 = TableScopedPath {
905            table: table_ref.clone(),
906            path: path1,
907        };
908        let key2 = TableScopedPath {
909            table: table_ref,
910            path: path2,
911        };
912        cache.put(&key1, Arc::clone(&value1));
913        cache.put(&key2, Arc::clone(&value2));
914
915        // Entries should be accessible immediately
916        assert!(cache.get(&key1).is_some());
917        assert!(cache.get(&key2).is_some());
918        // List cache entries
919        assert_eq!(
920            cache.list_entries(),
921            HashMap::from([
922                (
923                    key1.clone(),
924                    ListFilesEntry {
925                        metas: value1,
926                        size_bytes: size1,
927                        expires: mock_time.now().checked_add(ttl),
928                    }
929                ),
930                (
931                    key2.clone(),
932                    ListFilesEntry {
933                        metas: value2,
934                        size_bytes: size2,
935                        expires: mock_time.now().checked_add(ttl),
936                    }
937                )
938            ])
939        );
940        // Wait for TTL to expire
941        mock_time.inc(Duration::from_millis(150));
942
943        // Entries should now return None and be removed when observed through get or contains_key
944        assert!(cache.get(&key1).is_none());
945        assert_eq!(cache.len(), 1); // path1 was removed by get()
946        assert!(!cache.contains_key(&key2));
947        assert_eq!(cache.len(), 0); // path2 was removed by contains_key()
948    }
949
950    #[test]
951    fn test_cache_with_ttl_and_lru() {
952        let ttl = Duration::from_millis(200);
953
954        let mock_time = Arc::new(MockTimeProvider::new());
955        let cache = DefaultListFilesCache::new(1000, Some(ttl))
956            .with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
957
958        let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400);
959        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400);
960        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400);
961
962        let table_ref = Some(TableReference::from("table"));
963        let key1 = TableScopedPath {
964            table: table_ref.clone(),
965            path: path1,
966        };
967        let key2 = TableScopedPath {
968            table: table_ref.clone(),
969            path: path2,
970        };
971        let key3 = TableScopedPath {
972            table: table_ref,
973            path: path3,
974        };
975        cache.put(&key1, value1);
976        mock_time.inc(Duration::from_millis(50));
977        cache.put(&key2, value2);
978        mock_time.inc(Duration::from_millis(50));
979
980        // path3 should evict path1 due to size limit
981        cache.put(&key3, value3);
982        assert!(!cache.contains_key(&key1)); // Evicted by LRU
983        assert!(cache.contains_key(&key2));
984        assert!(cache.contains_key(&key3));
985
986        mock_time.inc(Duration::from_millis(151));
987
988        assert!(!cache.contains_key(&key2)); // Expired
989        assert!(cache.contains_key(&key3)); // Still valid 
990    }
991
992    #[test]
993    fn test_meta_heap_bytes_calculation() {
994        // Test with minimal ObjectMeta (no e_tag, no version)
995        let meta1 = ObjectMeta {
996            location: Path::from("test"),
997            last_modified: chrono::Utc::now(),
998            size: 100,
999            e_tag: None,
1000            version: None,
1001        };
1002        assert_eq!(meta_heap_bytes(&meta1), 4); // Just the location string "test"
1003
1004        // Test with e_tag
1005        let meta2 = ObjectMeta {
1006            location: Path::from("test"),
1007            last_modified: chrono::Utc::now(),
1008            size: 100,
1009            e_tag: Some("etag123".to_string()),
1010            version: None,
1011        };
1012        assert_eq!(meta_heap_bytes(&meta2), 4 + 7); // location (4) + e_tag (7)
1013
1014        // Test with version
1015        let meta3 = ObjectMeta {
1016            location: Path::from("test"),
1017            last_modified: chrono::Utc::now(),
1018            size: 100,
1019            e_tag: None,
1020            version: Some("v1.0".to_string()),
1021        };
1022        assert_eq!(meta_heap_bytes(&meta3), 4 + 4); // location (4) + version (4)
1023
1024        // Test with both e_tag and version
1025        let meta4 = ObjectMeta {
1026            location: Path::from("test"),
1027            last_modified: chrono::Utc::now(),
1028            size: 100,
1029            e_tag: Some("tag".to_string()),
1030            version: Some("ver".to_string()),
1031        };
1032        assert_eq!(meta_heap_bytes(&meta4), 4 + 3 + 3); // location (4) + e_tag (3) + version (3)
1033    }
1034
1035    #[test]
1036    fn test_entry_creation() {
1037        // Test with empty vector
1038        let empty_vec: Arc<Vec<ObjectMeta>> = Arc::new(vec![]);
1039        let now = Instant::now();
1040        let entry = ListFilesEntry::try_new(empty_vec, None, now);
1041        assert!(entry.is_none());
1042
1043        // Validate entry size
1044        let metas: Vec<ObjectMeta> = (0..5)
1045            .map(|i| create_test_object_meta(&format!("file{i}"), 30))
1046            .collect();
1047        let metas = Arc::new(metas);
1048        let entry = ListFilesEntry::try_new(metas, None, now).unwrap();
1049        assert_eq!(entry.metas.len(), 5);
1050        // Size should be: capacity * sizeof(ObjectMeta) + (5 * 30) for heap bytes
1051        let expected_size =
1052            (entry.metas.capacity() * size_of::<ObjectMeta>()) + (entry.metas.len() * 30);
1053        assert_eq!(entry.size_bytes, expected_size);
1054
1055        // Test with TTL
1056        let meta = create_test_object_meta("file", 50);
1057        let ttl = Duration::from_secs(10);
1058        let entry =
1059            ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl), now).unwrap();
1060        assert!(entry.expires.unwrap() > now);
1061    }
1062
1063    #[test]
1064    fn test_memory_tracking() {
1065        let cache = DefaultListFilesCache::new(1000, None);
1066
1067        // Verify cache starts with 0 memory used
1068        {
1069            let state = cache.state.lock().unwrap();
1070            assert_eq!(state.memory_used, 0);
1071        }
1072
1073        // Add entry and verify memory tracking
1074        let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 100);
1075        let table_ref = Some(TableReference::from("table"));
1076        let key1 = TableScopedPath {
1077            table: table_ref.clone(),
1078            path: path1,
1079        };
1080        cache.put(&key1, value1);
1081        {
1082            let state = cache.state.lock().unwrap();
1083            assert_eq!(state.memory_used, size1);
1084        }
1085
1086        // Add another entry
1087        let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 200);
1088        let key2 = TableScopedPath {
1089            table: table_ref.clone(),
1090            path: path2,
1091        };
1092        cache.put(&key2, value2);
1093        {
1094            let state = cache.state.lock().unwrap();
1095            assert_eq!(state.memory_used, size1 + size2);
1096        }
1097
1098        // Remove first entry and verify memory decreases
1099        cache.remove(&key1);
1100        {
1101            let state = cache.state.lock().unwrap();
1102            assert_eq!(state.memory_used, size2);
1103        }
1104
1105        // Clear and verify memory is 0
1106        cache.clear();
1107        {
1108            let state = cache.state.lock().unwrap();
1109            assert_eq!(state.memory_used, 0);
1110        }
1111    }
1112
1113    // Prefix-aware cache tests
1114
1115    /// Helper function to create ObjectMeta with a specific location path
1116    fn create_object_meta_with_path(location: &str) -> ObjectMeta {
1117        ObjectMeta {
1118            location: Path::from(location),
1119            last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
1120                .unwrap()
1121                .into(),
1122            size: 1024,
1123            e_tag: None,
1124            version: None,
1125        }
1126    }
1127
1128    #[test]
1129    fn test_prefix_aware_cache_hit() {
1130        // Scenario: Cache has full table listing, query for partition returns filtered results
1131        let cache = DefaultListFilesCache::new(100000, None);
1132
1133        // Create files for a partitioned table
1134        let table_base = Path::from("my_table");
1135        let files = Arc::new(vec![
1136            create_object_meta_with_path("my_table/a=1/file1.parquet"),
1137            create_object_meta_with_path("my_table/a=1/file2.parquet"),
1138            create_object_meta_with_path("my_table/a=2/file3.parquet"),
1139            create_object_meta_with_path("my_table/a=2/file4.parquet"),
1140        ]);
1141
1142        // Cache the full table listing
1143        let table_ref = Some(TableReference::from("table"));
1144        let key = TableScopedPath {
1145            table: table_ref,
1146            path: table_base,
1147        };
1148        cache.put(&key, files);
1149
1150        // Query for partition a=1 using get_with_extra
1151        // New API: get_with_extra(table_base, Some(relative_prefix))
1152        let prefix_a1 = Some(Path::from("a=1"));
1153        let result = cache.get_with_extra(&key, &prefix_a1);
1154
1155        // Should return filtered results (only files from a=1)
1156        assert!(result.is_some());
1157        let filtered = result.unwrap();
1158        assert_eq!(filtered.len(), 2);
1159        assert!(
1160            filtered
1161                .iter()
1162                .all(|m| m.location.as_ref().starts_with("my_table/a=1"))
1163        );
1164
1165        // Query for partition a=2
1166        let prefix_a2 = Some(Path::from("a=2"));
1167        let result_2 = cache.get_with_extra(&key, &prefix_a2);
1168
1169        assert!(result_2.is_some());
1170        let filtered_2 = result_2.unwrap();
1171        assert_eq!(filtered_2.len(), 2);
1172        assert!(
1173            filtered_2
1174                .iter()
1175                .all(|m| m.location.as_ref().starts_with("my_table/a=2"))
1176        );
1177    }
1178
1179    #[test]
1180    fn test_prefix_aware_cache_no_filter_returns_all() {
1181        // Scenario: Query with no prefix filter should return all files
1182        let cache = DefaultListFilesCache::new(100000, None);
1183
1184        let table_base = Path::from("my_table");
1185
1186        // Cache full table listing with 4 files
1187        let full_files = Arc::new(vec![
1188            create_object_meta_with_path("my_table/a=1/file1.parquet"),
1189            create_object_meta_with_path("my_table/a=1/file2.parquet"),
1190            create_object_meta_with_path("my_table/a=2/file3.parquet"),
1191            create_object_meta_with_path("my_table/a=2/file4.parquet"),
1192        ]);
1193        let table_ref = Some(TableReference::from("table"));
1194        let key = TableScopedPath {
1195            table: table_ref,
1196            path: table_base,
1197        };
1198        cache.put(&key, full_files);
1199
1200        // Query with no prefix filter (None) should return all 4 files
1201        let result = cache.get_with_extra(&key, &None);
1202        assert!(result.is_some());
1203        let files = result.unwrap();
1204        assert_eq!(files.len(), 4);
1205
1206        // Also test using get() which delegates to get_with_extra(&None)
1207        let result_get = cache.get(&key);
1208        assert!(result_get.is_some());
1209        assert_eq!(result_get.unwrap().len(), 4);
1210    }
1211
1212    #[test]
1213    fn test_prefix_aware_cache_miss_no_entry() {
1214        // Scenario: Table not cached, query should miss
1215        let cache = DefaultListFilesCache::new(100000, None);
1216
1217        let table_base = Path::from("my_table");
1218        let table_ref = Some(TableReference::from("table"));
1219        let key = TableScopedPath {
1220            table: table_ref,
1221            path: table_base,
1222        };
1223
1224        // Query for full table should miss (nothing cached)
1225        let result = cache.get_with_extra(&key, &None);
1226        assert!(result.is_none());
1227
1228        // Query with prefix should also miss
1229        let prefix = Some(Path::from("a=1"));
1230        let result_2 = cache.get_with_extra(&key, &prefix);
1231        assert!(result_2.is_none());
1232    }
1233
1234    #[test]
1235    fn test_prefix_aware_cache_no_matching_files() {
1236        // Scenario: Cache has table listing but no files match the requested partition
1237        let cache = DefaultListFilesCache::new(100000, None);
1238
1239        let table_base = Path::from("my_table");
1240        let files = Arc::new(vec![
1241            create_object_meta_with_path("my_table/a=1/file1.parquet"),
1242            create_object_meta_with_path("my_table/a=2/file2.parquet"),
1243        ]);
1244        let table_ref = Some(TableReference::from("table"));
1245        let key = TableScopedPath {
1246            table: table_ref,
1247            path: table_base,
1248        };
1249        cache.put(&key, files);
1250
1251        // Query for partition a=3 which doesn't exist
1252        let prefix_a3 = Some(Path::from("a=3"));
1253        let result = cache.get_with_extra(&key, &prefix_a3);
1254
1255        // Should return None since no files match
1256        assert!(result.is_none());
1257    }
1258
1259    #[test]
1260    fn test_prefix_aware_nested_partitions() {
1261        // Scenario: Table with multiple partition levels (e.g., year/month/day)
1262        let cache = DefaultListFilesCache::new(100000, None);
1263
1264        let table_base = Path::from("events");
1265        let files = Arc::new(vec![
1266            create_object_meta_with_path(
1267                "events/year=2024/month=01/day=01/file1.parquet",
1268            ),
1269            create_object_meta_with_path(
1270                "events/year=2024/month=01/day=02/file2.parquet",
1271            ),
1272            create_object_meta_with_path(
1273                "events/year=2024/month=02/day=01/file3.parquet",
1274            ),
1275            create_object_meta_with_path(
1276                "events/year=2025/month=01/day=01/file4.parquet",
1277            ),
1278        ]);
1279        let table_ref = Some(TableReference::from("table"));
1280        let key = TableScopedPath {
1281            table: table_ref,
1282            path: table_base,
1283        };
1284        cache.put(&key, files);
1285
1286        // Query for year=2024/month=01 (should get 2 files)
1287        let prefix_month = Some(Path::from("year=2024/month=01"));
1288        let result = cache.get_with_extra(&key, &prefix_month);
1289        assert!(result.is_some());
1290        assert_eq!(result.unwrap().len(), 2);
1291
1292        // Query for year=2024 (should get 3 files)
1293        let prefix_year = Some(Path::from("year=2024"));
1294        let result_year = cache.get_with_extra(&key, &prefix_year);
1295        assert!(result_year.is_some());
1296        assert_eq!(result_year.unwrap().len(), 3);
1297
1298        // Query for specific day (should get 1 file)
1299        let prefix_day = Some(Path::from("year=2024/month=01/day=01"));
1300        let result_day = cache.get_with_extra(&key, &prefix_day);
1301        assert!(result_day.is_some());
1302        assert_eq!(result_day.unwrap().len(), 1);
1303    }
1304
1305    #[test]
1306    fn test_prefix_aware_different_tables() {
1307        // Scenario: Multiple tables cached, queries should not cross-contaminate
1308        let cache = DefaultListFilesCache::new(100000, None);
1309
1310        let table_a = Path::from("table_a");
1311        let table_b = Path::from("table_b");
1312
1313        let files_a = Arc::new(vec![create_object_meta_with_path(
1314            "table_a/part=1/file1.parquet",
1315        )]);
1316        let files_b = Arc::new(vec![
1317            create_object_meta_with_path("table_b/part=1/file1.parquet"),
1318            create_object_meta_with_path("table_b/part=2/file2.parquet"),
1319        ]);
1320
1321        let table_ref_a = Some(TableReference::from("table_a"));
1322        let table_ref_b = Some(TableReference::from("table_b"));
1323        let key_a = TableScopedPath {
1324            table: table_ref_a,
1325            path: table_a,
1326        };
1327        let key_b = TableScopedPath {
1328            table: table_ref_b,
1329            path: table_b,
1330        };
1331        cache.put(&key_a, files_a);
1332        cache.put(&key_b, files_b);
1333
1334        // Query table_a should only return table_a files
1335        let result_a = cache.get(&key_a);
1336        assert!(result_a.is_some());
1337        assert_eq!(result_a.unwrap().len(), 1);
1338
1339        // Query table_b with prefix should only return matching table_b files
1340        let prefix = Some(Path::from("part=1"));
1341        let result_b = cache.get_with_extra(&key_b, &prefix);
1342        assert!(result_b.is_some());
1343        assert_eq!(result_b.unwrap().len(), 1);
1344    }
1345
1346    #[test]
1347    fn test_drop_table_entries() {
1348        let cache = DefaultListFilesCache::default();
1349
1350        let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100);
1351        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
1352        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
1353
1354        let table_ref1 = Some(TableReference::from("table1"));
1355        let key1 = TableScopedPath {
1356            table: table_ref1.clone(),
1357            path: path1,
1358        };
1359        let key2 = TableScopedPath {
1360            table: table_ref1.clone(),
1361            path: path2,
1362        };
1363
1364        let table_ref2 = Some(TableReference::from("table2"));
1365        let key3 = TableScopedPath {
1366            table: table_ref2.clone(),
1367            path: path3,
1368        };
1369
1370        cache.put(&key1, value1);
1371        cache.put(&key2, value2);
1372        cache.put(&key3, value3);
1373
1374        cache.drop_table_entries(&table_ref1).unwrap();
1375
1376        assert!(!cache.contains_key(&key1));
1377        assert!(!cache.contains_key(&key2));
1378        assert!(cache.contains_key(&key3));
1379    }
1380}