Skip to main content

datafusion_execution/cache/
list_files_cache.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::cache::{
19    CacheAccessor,
20    cache_manager::{CachedFileList, ListFilesCache},
21    lru_queue::LruQueue,
22};
23
24use std::fmt::{Debug, Display, Formatter};
25use std::mem::size_of;
26use std::{
27    collections::HashMap,
28    sync::{Arc, Mutex},
29    time::Duration,
30};
31
32use datafusion_common::TableReference;
33use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx};
34use datafusion_common::instant::Instant;
35use object_store::{ObjectMeta, path::Path};
36
37pub trait TimeProvider: Send + Sync + 'static {
38    fn now(&self) -> Instant;
39}
40
41#[derive(Debug, Default)]
42pub struct SystemTimeProvider;
43
44impl TimeProvider for SystemTimeProvider {
45    fn now(&self) -> Instant {
46        Instant::now()
47    }
48}
49
50/// Default implementation of [`ListFilesCache`]
51///
52/// Caches file metadata for file listing operations.
53///
54/// # Internal details
55///
56/// The `memory_limit` parameter controls the maximum size of the cache, which uses a Least
57/// Recently Used eviction algorithm. When adding a new entry, if the total number of entries in
58/// the cache exceeds `memory_limit`, the least recently used entries are evicted until the total
59/// size is lower than the `memory_limit`.
60///
61/// # Cache API
62///
63/// Uses `get` and `put` methods for cache operations. TTL validation is handled internally -
64/// expired entries return `None` from `get`.
65pub struct DefaultListFilesCache {
66    state: Mutex<DefaultListFilesCacheState>,
67    time_provider: Arc<dyn TimeProvider>,
68}
69
70impl Default for DefaultListFilesCache {
71    fn default() -> Self {
72        Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None)
73    }
74}
75
76impl DefaultListFilesCache {
77    /// Creates a new instance of [`DefaultListFilesCache`].
78    ///
79    /// # Arguments
80    /// * `memory_limit` - The maximum size of the cache, in bytes.
81    /// * `ttl` - The TTL (time-to-live) of entries in the cache.
82    pub fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
83        Self {
84            state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)),
85            time_provider: Arc::new(SystemTimeProvider),
86        }
87    }
88
89    #[cfg(test)]
90    pub(crate) fn with_time_provider(mut self, provider: Arc<dyn TimeProvider>) -> Self {
91        self.time_provider = provider;
92        self
93    }
94}
95
96#[derive(Clone, PartialEq, Debug)]
97pub struct ListFilesEntry {
98    pub metas: CachedFileList,
99    pub size_bytes: usize,
100    pub expires: Option<Instant>,
101}
102
103impl ListFilesEntry {
104    fn try_new(
105        cached_file_list: CachedFileList,
106        ttl: Option<Duration>,
107        now: Instant,
108    ) -> Option<Self> {
109        let size_bytes = (cached_file_list.files.capacity() * size_of::<ObjectMeta>())
110            + cached_file_list
111                .files
112                .iter()
113                .map(meta_heap_bytes)
114                .reduce(|acc, b| acc + b)?;
115
116        Some(Self {
117            metas: cached_file_list,
118            size_bytes,
119            expires: ttl.map(|t| now + t),
120        })
121    }
122}
123
124/// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap.
125fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {
126    let mut size = object_meta.location.as_ref().len();
127
128    if let Some(e) = &object_meta.e_tag {
129        size += e.len();
130    }
131    if let Some(v) = &object_meta.version {
132        size += v.len();
133    }
134
135    size
136}
137
138/// The default memory limit for the [`DefaultListFilesCache`]
139pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB
140
141/// The default cache TTL for the [`DefaultListFilesCache`]
142pub const DEFAULT_LIST_FILES_CACHE_TTL: Option<Duration> = None; // Infinite
143
144/// Key for [`DefaultListFilesCache`]
145///
146/// Each entry is scoped to its use within a specific table so that the cache
147/// can differentiate between identical paths in different tables, and
148/// table-level cache invalidation.
149#[derive(PartialEq, Eq, Hash, Clone, Debug)]
150pub struct TableScopedPath {
151    pub table: Option<TableReference>,
152    pub path: Path,
153}
154
155/// Handles the inner state of the [`DefaultListFilesCache`] struct.
156pub struct DefaultListFilesCacheState {
157    lru_queue: LruQueue<TableScopedPath, ListFilesEntry>,
158    memory_limit: usize,
159    memory_used: usize,
160    ttl: Option<Duration>,
161}
162
163impl Default for DefaultListFilesCacheState {
164    fn default() -> Self {
165        Self {
166            lru_queue: LruQueue::new(),
167            memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
168            memory_used: 0,
169            ttl: DEFAULT_LIST_FILES_CACHE_TTL,
170        }
171    }
172}
173
174impl DFHeapSize for TableScopedPath {
175    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
176        self.path.as_ref().heap_size(ctx) + self.table.heap_size(ctx)
177    }
178}
179
180impl Display for TableScopedPath {
181    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
182        if let Some(table) = &self.table {
183            write!(f, "{}, {}", self.path, table)
184        } else {
185            write!(f, "{}", self.path)
186        }
187    }
188}
189
190impl DefaultListFilesCacheState {
191    fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
192        Self {
193            lru_queue: LruQueue::new(),
194            memory_limit,
195            memory_used: 0,
196            ttl,
197        }
198    }
199
200    /// Gets an entry from the cache, checking for expiration.
201    ///
202    /// Returns the cached file list if it exists and hasn't expired.
203    /// If the entry has expired, it is removed from the cache.
204    fn get(&mut self, key: &TableScopedPath, now: Instant) -> Option<CachedFileList> {
205        let entry = self.lru_queue.get(key)?;
206
207        // Check expiration
208        if let Some(exp) = entry.expires
209            && now > exp
210        {
211            self.remove(key);
212            return None;
213        }
214
215        Some(entry.metas.clone())
216    }
217
218    /// Checks if the respective entry is currently cached.
219    ///
220    /// If the entry has expired by `now` it is removed from the cache.
221    ///
222    /// The LRU queue is not updated.
223    fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool {
224        let Some(entry) = self.lru_queue.peek(k) else {
225            return false;
226        };
227
228        match entry.expires {
229            Some(exp) if now > exp => {
230                self.remove(k);
231                false
232            }
233            _ => true,
234        }
235    }
236
237    /// Adds a new key-value pair to cache expiring at `now` + the TTL.
238    ///
239    /// This means that LRU entries might be evicted if required.
240    /// If the key is already in the cache, the previous entry is returned.
241    /// If the size of the entry is greater than the `memory_limit`, the value is not inserted.
242    fn put(
243        &mut self,
244        key: &TableScopedPath,
245        value: CachedFileList,
246        now: Instant,
247    ) -> Option<CachedFileList> {
248        let entry = ListFilesEntry::try_new(value, self.ttl, now)?;
249        let entry_size = entry.size_bytes;
250
251        // no point in trying to add this value to the cache if it cannot fit entirely
252        if entry_size > self.memory_limit {
253            return None;
254        }
255
256        // if the key is already in the cache, the old value is removed
257        let old_value = self.lru_queue.put(key.clone(), entry);
258        self.memory_used += entry_size;
259
260        if let Some(entry) = &old_value {
261            self.memory_used -= entry.size_bytes;
262        }
263
264        self.evict_entries();
265
266        old_value.map(|v| v.metas)
267    }
268
269    /// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`.
270    fn evict_entries(&mut self) {
271        while self.memory_used > self.memory_limit {
272            if let Some(removed) = self.lru_queue.pop() {
273                self.memory_used -= removed.1.size_bytes;
274            } else {
275                // cache is empty while memory_used > memory_limit, cannot happen
276                debug_assert!(
277                    false,
278                    "cache is empty while memory_used > memory_limit, cannot happen"
279                );
280                return;
281            }
282        }
283    }
284
285    /// Removes an entry from the cache and returns it, if it exists.
286    fn remove(&mut self, k: &TableScopedPath) -> Option<CachedFileList> {
287        if let Some(entry) = self.lru_queue.remove(k) {
288            self.memory_used -= entry.size_bytes;
289            Some(entry.metas)
290        } else {
291            None
292        }
293    }
294
295    /// Returns the number of entries currently cached.
296    fn len(&self) -> usize {
297        self.lru_queue.len()
298    }
299
300    /// Removes all entries from the cache.
301    fn clear(&mut self) {
302        self.lru_queue.clear();
303        self.memory_used = 0;
304    }
305}
306
307impl CacheAccessor<TableScopedPath, CachedFileList> for DefaultListFilesCache {
308    fn get(&self, key: &TableScopedPath) -> Option<CachedFileList> {
309        let mut state = self.state.lock().unwrap();
310        let now = self.time_provider.now();
311        state.get(key, now)
312    }
313
314    fn put(
315        &self,
316        key: &TableScopedPath,
317        value: CachedFileList,
318    ) -> Option<CachedFileList> {
319        let mut state = self.state.lock().unwrap();
320        let now = self.time_provider.now();
321        state.put(key, value, now)
322    }
323
324    fn remove(&self, k: &TableScopedPath) -> Option<CachedFileList> {
325        let mut state = self.state.lock().unwrap();
326        state.remove(k)
327    }
328
329    fn contains_key(&self, k: &TableScopedPath) -> bool {
330        let mut state = self.state.lock().unwrap();
331        let now = self.time_provider.now();
332        state.contains_key(k, now)
333    }
334
335    fn len(&self) -> usize {
336        let state = self.state.lock().unwrap();
337        state.len()
338    }
339
340    fn clear(&self) {
341        let mut state = self.state.lock().unwrap();
342        state.clear();
343    }
344
345    fn name(&self) -> String {
346        String::from("DefaultListFilesCache")
347    }
348}
349
350impl ListFilesCache for DefaultListFilesCache {
351    fn cache_limit(&self) -> usize {
352        let state = self.state.lock().unwrap();
353        state.memory_limit
354    }
355
356    fn cache_ttl(&self) -> Option<Duration> {
357        let state = self.state.lock().unwrap();
358        state.ttl
359    }
360
361    fn update_cache_limit(&self, limit: usize) {
362        let mut state = self.state.lock().unwrap();
363        state.memory_limit = limit;
364        state.evict_entries();
365    }
366
367    fn update_cache_ttl(&self, ttl: Option<Duration>) {
368        let mut state = self.state.lock().unwrap();
369        state.ttl = ttl;
370        state.evict_entries();
371    }
372
373    fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry> {
374        let state = self.state.lock().unwrap();
375        let mut entries = HashMap::<TableScopedPath, ListFilesEntry>::new();
376        for (path, entry) in state.lru_queue.list_entries() {
377            entries.insert(path.clone(), entry.clone());
378        }
379        entries
380    }
381
382    fn drop_table_entries(
383        &self,
384        table_ref: &Option<TableReference>,
385    ) -> datafusion_common::Result<()> {
386        let mut state = self.state.lock().unwrap();
387        let mut table_paths = vec![];
388        for (path, _) in state.lru_queue.list_entries() {
389            if path.table == *table_ref {
390                table_paths.push(path.clone());
391            }
392        }
393        for path in table_paths {
394            state.remove(&path);
395        }
396        Ok(())
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403    use chrono::DateTime;
404    use std::thread;
405
406    struct MockTimeProvider {
407        base: Instant,
408        offset: Mutex<Duration>,
409    }
410
411    impl MockTimeProvider {
412        fn new() -> Self {
413            Self {
414                base: Instant::now(),
415                offset: Mutex::new(Duration::ZERO),
416            }
417        }
418
419        fn inc(&self, duration: Duration) {
420            let mut offset = self.offset.lock().unwrap();
421            *offset += duration;
422        }
423    }
424
425    impl TimeProvider for MockTimeProvider {
426        fn now(&self) -> Instant {
427            self.base + *self.offset.lock().unwrap()
428        }
429    }
430
431    /// Helper function to create a test ObjectMeta with a specific path and location string size
432    fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta {
433        // Create a location string of the desired size by padding with zeros
434        let location_str = if location_size > path.len() {
435            format!("{}{}", path, "0".repeat(location_size - path.len()))
436        } else {
437            path.to_string()
438        };
439
440        ObjectMeta {
441            location: Path::from(location_str),
442            last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
443                .unwrap()
444                .into(),
445            size: 1024,
446            e_tag: None,
447            version: None,
448        }
449    }
450
451    /// Helper function to create a CachedFileList with at least meta_size bytes
452    fn create_test_list_files_entry(
453        path: &str,
454        count: usize,
455        meta_size: usize,
456    ) -> (Path, CachedFileList, usize) {
457        let metas: Vec<ObjectMeta> = (0..count)
458            .map(|i| create_test_object_meta(&format!("file{i}"), meta_size))
459            .collect();
460
461        // Calculate actual size using the same logic as ListFilesEntry::try_new
462        let size = (metas.capacity() * size_of::<ObjectMeta>())
463            + metas.iter().map(meta_heap_bytes).sum::<usize>();
464
465        (Path::from(path), CachedFileList::new(metas), size)
466    }
467
468    #[test]
469    fn test_basic_operations() {
470        let cache = DefaultListFilesCache::default();
471        let table_ref = Some(TableReference::from("table"));
472        let path = Path::from("test_path");
473        let key = TableScopedPath {
474            table: table_ref.clone(),
475            path,
476        };
477
478        // Initially cache is empty
479        assert!(!cache.contains_key(&key));
480        assert_eq!(cache.len(), 0);
481
482        // Cache miss - get returns None
483        assert!(cache.get(&key).is_none());
484
485        // Put a value
486        let meta = create_test_object_meta("file1", 50);
487        cache.put(&key, CachedFileList::new(vec![meta]));
488
489        // Entry should be cached
490        assert!(cache.contains_key(&key));
491        assert_eq!(cache.len(), 1);
492        let result = cache.get(&key).unwrap();
493        assert_eq!(result.files.len(), 1);
494
495        // Remove the entry
496        let removed = cache.remove(&key).unwrap();
497        assert_eq!(removed.files.len(), 1);
498        assert!(!cache.contains_key(&key));
499        assert_eq!(cache.len(), 0);
500
501        // Put multiple entries
502        let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
503        let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50);
504        let key1 = TableScopedPath {
505            table: table_ref.clone(),
506            path: path1,
507        };
508        let key2 = TableScopedPath {
509            table: table_ref,
510            path: path2,
511        };
512        cache.put(&key1, value1.clone());
513        cache.put(&key2, value2.clone());
514        assert_eq!(cache.len(), 2);
515
516        // List cache entries
517        assert_eq!(
518            cache.list_entries(),
519            HashMap::from([
520                (
521                    key1.clone(),
522                    ListFilesEntry {
523                        metas: value1,
524                        size_bytes: size1,
525                        expires: None,
526                    }
527                ),
528                (
529                    key2.clone(),
530                    ListFilesEntry {
531                        metas: value2,
532                        size_bytes: size2,
533                        expires: None,
534                    }
535                )
536            ])
537        );
538
539        // Clear all entries
540        cache.clear();
541        assert_eq!(cache.len(), 0);
542        assert!(!cache.contains_key(&key1));
543        assert!(!cache.contains_key(&key2));
544    }
545
546    #[test]
547    fn test_lru_eviction_basic() {
548        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
549        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
550        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
551
552        // Set cache limit to exactly fit all three entries
553        let cache = DefaultListFilesCache::new(size * 3, None);
554
555        let table_ref = Some(TableReference::from("table"));
556        let key1 = TableScopedPath {
557            table: table_ref.clone(),
558            path: path1,
559        };
560        let key2 = TableScopedPath {
561            table: table_ref.clone(),
562            path: path2,
563        };
564        let key3 = TableScopedPath {
565            table: table_ref.clone(),
566            path: path3,
567        };
568
569        // All three entries should fit
570        cache.put(&key1, value1);
571        cache.put(&key2, value2);
572        cache.put(&key3, value3);
573        assert_eq!(cache.len(), 3);
574        assert!(cache.contains_key(&key1));
575        assert!(cache.contains_key(&key2));
576        assert!(cache.contains_key(&key3));
577
578        // Adding a new entry should evict path1 (LRU)
579        let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
580        let key4 = TableScopedPath {
581            table: table_ref,
582            path: path4,
583        };
584        cache.put(&key4, value4);
585
586        assert_eq!(cache.len(), 3);
587        assert!(!cache.contains_key(&key1)); // Evicted
588        assert!(cache.contains_key(&key2));
589        assert!(cache.contains_key(&key3));
590        assert!(cache.contains_key(&key4));
591    }
592
593    #[test]
594    fn test_lru_ordering_after_access() {
595        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
596        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
597        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
598
599        // Set cache limit to fit exactly three entries
600        let cache = DefaultListFilesCache::new(size * 3, None);
601
602        let table_ref = Some(TableReference::from("table"));
603        let key1 = TableScopedPath {
604            table: table_ref.clone(),
605            path: path1,
606        };
607        let key2 = TableScopedPath {
608            table: table_ref.clone(),
609            path: path2,
610        };
611        let key3 = TableScopedPath {
612            table: table_ref.clone(),
613            path: path3,
614        };
615
616        cache.put(&key1, value1);
617        cache.put(&key2, value2);
618        cache.put(&key3, value3);
619        assert_eq!(cache.len(), 3);
620
621        // Access path1 to move it to front (MRU)
622        // Order is now: path2 (LRU), path3, path1 (MRU)
623        let _ = cache.get(&key1);
624
625        // Adding a new entry should evict path2 (the LRU)
626        let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
627        let key4 = TableScopedPath {
628            table: table_ref,
629            path: path4,
630        };
631        cache.put(&key4, value4);
632
633        assert_eq!(cache.len(), 3);
634        assert!(cache.contains_key(&key1)); // Still present (recently accessed)
635        assert!(!cache.contains_key(&key2)); // Evicted (was LRU)
636        assert!(cache.contains_key(&key3));
637        assert!(cache.contains_key(&key4));
638    }
639
640    #[test]
641    fn test_reject_too_large() {
642        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
643        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
644
645        // Set cache limit to fit both entries
646        let cache = DefaultListFilesCache::new(size * 2, None);
647
648        let table_ref = Some(TableReference::from("table"));
649        let key1 = TableScopedPath {
650            table: table_ref.clone(),
651            path: path1,
652        };
653        let key2 = TableScopedPath {
654            table: table_ref.clone(),
655            path: path2,
656        };
657        cache.put(&key1, value1);
658        cache.put(&key2, value2);
659        assert_eq!(cache.len(), 2);
660
661        // Try to add an entry that's too large to fit in the cache
662        // The entry is not stored (too large)
663        let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000);
664        let key_large = TableScopedPath {
665            table: table_ref,
666            path: path_large,
667        };
668        cache.put(&key_large, value_large);
669
670        // Large entry should not be added
671        assert!(!cache.contains_key(&key_large));
672        assert_eq!(cache.len(), 2);
673        assert!(cache.contains_key(&key1));
674        assert!(cache.contains_key(&key2));
675    }
676
677    #[test]
678    fn test_multiple_evictions() {
679        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
680        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
681        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
682
683        // Set cache limit for exactly 3 entries
684        let cache = DefaultListFilesCache::new(size * 3, None);
685
686        let table_ref = Some(TableReference::from("table"));
687        let key1 = TableScopedPath {
688            table: table_ref.clone(),
689            path: path1,
690        };
691        let key2 = TableScopedPath {
692            table: table_ref.clone(),
693            path: path2,
694        };
695        let key3 = TableScopedPath {
696            table: table_ref.clone(),
697            path: path3,
698        };
699        cache.put(&key1, value1);
700        cache.put(&key2, value2);
701        cache.put(&key3, value3);
702        assert_eq!(cache.len(), 3);
703
704        // Add a large entry that requires evicting 2 entries
705        let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 200);
706        let key_large = TableScopedPath {
707            table: table_ref,
708            path: path_large,
709        };
710        cache.put(&key_large, value_large);
711
712        // path1 and path2 should be evicted (both LRU), path3 and path_large remain
713        assert_eq!(cache.len(), 2);
714        assert!(!cache.contains_key(&key1)); // Evicted
715        assert!(!cache.contains_key(&key2)); // Evicted
716        assert!(cache.contains_key(&key3));
717        assert!(cache.contains_key(&key_large));
718    }
719
720    #[test]
721    fn test_cache_limit_resize() {
722        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
723        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
724        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
725
726        let cache = DefaultListFilesCache::new(size * 3, None);
727
728        let table_ref = Some(TableReference::from("table"));
729        let key1 = TableScopedPath {
730            table: table_ref.clone(),
731            path: path1,
732        };
733        let key2 = TableScopedPath {
734            table: table_ref.clone(),
735            path: path2,
736        };
737        let key3 = TableScopedPath {
738            table: table_ref,
739            path: path3,
740        };
741        // Add three entries
742        cache.put(&key1, value1);
743        cache.put(&key2, value2);
744        cache.put(&key3, value3);
745        assert_eq!(cache.len(), 3);
746
747        // Resize cache to only fit one entry
748        cache.update_cache_limit(size);
749
750        // Should keep only the most recent entry (path3, the MRU)
751        assert_eq!(cache.len(), 1);
752        assert!(cache.contains_key(&key3));
753        // Earlier entries (LRU) should be evicted
754        assert!(!cache.contains_key(&key1));
755        assert!(!cache.contains_key(&key2));
756    }
757
758    #[test]
759    fn test_entry_update_with_size_change() {
760        let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
761        let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 100);
762        let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 100);
763
764        let cache = DefaultListFilesCache::new(size * 3, None);
765
766        let table_ref = Some(TableReference::from("table"));
767        let key1 = TableScopedPath {
768            table: table_ref.clone(),
769            path: path1,
770        };
771        let key2 = TableScopedPath {
772            table: table_ref.clone(),
773            path: path2,
774        };
775        let key3 = TableScopedPath {
776            table: table_ref,
777            path: path3,
778        };
779        // Add three entries
780        cache.put(&key1, value1);
781        cache.put(&key2, value2.clone());
782        cache.put(&key3, value3_v1);
783        assert_eq!(cache.len(), 3);
784
785        // Update path3 with same size - should not cause eviction
786        let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100);
787        cache.put(&key3, value3_v2);
788
789        assert_eq!(cache.len(), 3);
790        assert!(cache.contains_key(&key1));
791        assert!(cache.contains_key(&key2));
792        assert!(cache.contains_key(&key3));
793
794        // Update path3 with larger size that requires evicting path1 (LRU)
795        let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200);
796        cache.put(&key3, value3_v3.clone());
797
798        assert_eq!(cache.len(), 2);
799        assert!(!cache.contains_key(&key1)); // Evicted (was LRU)
800        assert!(cache.contains_key(&key2));
801        assert!(cache.contains_key(&key3));
802
803        // List cache entries
804        assert_eq!(
805            cache.list_entries(),
806            HashMap::from([
807                (
808                    key2,
809                    ListFilesEntry {
810                        metas: value2,
811                        size_bytes: size2,
812                        expires: None,
813                    }
814                ),
815                (
816                    key3,
817                    ListFilesEntry {
818                        metas: value3_v3,
819                        size_bytes: size3_v3,
820                        expires: None,
821                    }
822                )
823            ])
824        );
825    }
826
827    #[test]
828    fn test_cache_with_ttl() {
829        let ttl = Duration::from_millis(100);
830
831        let mock_time = Arc::new(MockTimeProvider::new());
832        let cache = DefaultListFilesCache::new(10000, Some(ttl))
833            .with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
834
835        let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
836        let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50);
837
838        let table_ref = Some(TableReference::from("table"));
839        let key1 = TableScopedPath {
840            table: table_ref.clone(),
841            path: path1,
842        };
843        let key2 = TableScopedPath {
844            table: table_ref,
845            path: path2,
846        };
847        cache.put(&key1, value1.clone());
848        cache.put(&key2, value2.clone());
849
850        // Entries should be accessible immediately
851        assert!(cache.get(&key1).is_some());
852        assert!(cache.get(&key2).is_some());
853        // List cache entries
854        assert_eq!(
855            cache.list_entries(),
856            HashMap::from([
857                (
858                    key1.clone(),
859                    ListFilesEntry {
860                        metas: value1,
861                        size_bytes: size1,
862                        expires: mock_time.now().checked_add(ttl),
863                    }
864                ),
865                (
866                    key2.clone(),
867                    ListFilesEntry {
868                        metas: value2,
869                        size_bytes: size2,
870                        expires: mock_time.now().checked_add(ttl),
871                    }
872                )
873            ])
874        );
875        // Wait for TTL to expire
876        mock_time.inc(Duration::from_millis(150));
877
878        // Entries should now return None when observed through contains_key
879        assert!(!cache.contains_key(&key1));
880        assert_eq!(cache.len(), 1); // key1 was removed by contains_key()
881        assert!(!cache.contains_key(&key2));
882        assert_eq!(cache.len(), 0); // key2 was removed by contains_key()
883    }
884
885    #[test]
886    fn test_cache_with_ttl_and_lru() {
887        let ttl = Duration::from_millis(200);
888
889        let mock_time = Arc::new(MockTimeProvider::new());
890        let cache = DefaultListFilesCache::new(1000, Some(ttl))
891            .with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
892
893        let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400);
894        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400);
895        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400);
896
897        let table_ref = Some(TableReference::from("table"));
898        let key1 = TableScopedPath {
899            table: table_ref.clone(),
900            path: path1,
901        };
902        let key2 = TableScopedPath {
903            table: table_ref.clone(),
904            path: path2,
905        };
906        let key3 = TableScopedPath {
907            table: table_ref,
908            path: path3,
909        };
910        cache.put(&key1, value1);
911        mock_time.inc(Duration::from_millis(50));
912        cache.put(&key2, value2);
913        mock_time.inc(Duration::from_millis(50));
914
915        // path3 should evict path1 due to size limit
916        cache.put(&key3, value3);
917        assert!(!cache.contains_key(&key1)); // Evicted by LRU
918        assert!(cache.contains_key(&key2));
919        assert!(cache.contains_key(&key3));
920
921        mock_time.inc(Duration::from_millis(151));
922
923        assert!(!cache.contains_key(&key2)); // Expired
924        assert!(cache.contains_key(&key3)); // Still valid
925    }
926
927    #[test]
928    fn test_ttl_expiration_in_get() {
929        let ttl = Duration::from_millis(100);
930        let cache = DefaultListFilesCache::new(10000, Some(ttl));
931
932        let (path, value, _) = create_test_list_files_entry("path", 2, 50);
933        let table_ref = Some(TableReference::from("table"));
934        let key = TableScopedPath {
935            table: table_ref,
936            path,
937        };
938
939        // Cache the entry
940        cache.put(&key, value.clone());
941
942        // Entry should be accessible immediately
943        let result = cache.get(&key);
944        assert!(result.is_some());
945        assert_eq!(result.unwrap().files.len(), 2);
946
947        // Wait for TTL to expire
948        thread::sleep(Duration::from_millis(150));
949
950        // Get should return None because entry expired
951        let result2 = cache.get(&key);
952        assert!(result2.is_none());
953    }
954
955    #[test]
956    fn test_meta_heap_bytes_calculation() {
957        // Test with minimal ObjectMeta (no e_tag, no version)
958        let meta1 = ObjectMeta {
959            location: Path::from("test"),
960            last_modified: chrono::Utc::now(),
961            size: 100,
962            e_tag: None,
963            version: None,
964        };
965        assert_eq!(meta_heap_bytes(&meta1), 4); // Just the location string "test"
966
967        // Test with e_tag
968        let meta2 = ObjectMeta {
969            location: Path::from("test"),
970            last_modified: chrono::Utc::now(),
971            size: 100,
972            e_tag: Some("etag123".to_string()),
973            version: None,
974        };
975        assert_eq!(meta_heap_bytes(&meta2), 4 + 7); // location (4) + e_tag (7)
976
977        // Test with version
978        let meta3 = ObjectMeta {
979            location: Path::from("test"),
980            last_modified: chrono::Utc::now(),
981            size: 100,
982            e_tag: None,
983            version: Some("v1.0".to_string()),
984        };
985        assert_eq!(meta_heap_bytes(&meta3), 4 + 4); // location (4) + version (4)
986
987        // Test with both e_tag and version
988        let meta4 = ObjectMeta {
989            location: Path::from("test"),
990            last_modified: chrono::Utc::now(),
991            size: 100,
992            e_tag: Some("tag".to_string()),
993            version: Some("ver".to_string()),
994        };
995        assert_eq!(meta_heap_bytes(&meta4), 4 + 3 + 3); // location (4) + e_tag (3) + version (3)
996    }
997
998    #[test]
999    fn test_entry_creation() {
1000        // Test with empty vector
1001        let empty_list = CachedFileList::new(vec![]);
1002        let now = Instant::now();
1003        let entry = ListFilesEntry::try_new(empty_list, None, now);
1004        assert!(entry.is_none());
1005
1006        // Validate entry size
1007        let metas: Vec<ObjectMeta> = (0..5)
1008            .map(|i| create_test_object_meta(&format!("file{i}"), 30))
1009            .collect();
1010        let cached_list = CachedFileList::new(metas);
1011        let entry = ListFilesEntry::try_new(cached_list, None, now).unwrap();
1012        assert_eq!(entry.metas.files.len(), 5);
1013        // Size should be: capacity * sizeof(ObjectMeta) + (5 * 30) for heap bytes
1014        let expected_size = (entry.metas.files.capacity() * size_of::<ObjectMeta>())
1015            + (entry.metas.files.len() * 30);
1016        assert_eq!(entry.size_bytes, expected_size);
1017
1018        // Test with TTL
1019        let meta = create_test_object_meta("file", 50);
1020        let ttl = Duration::from_secs(10);
1021        let cached_list = CachedFileList::new(vec![meta]);
1022        let entry = ListFilesEntry::try_new(cached_list, Some(ttl), now).unwrap();
1023        assert!(entry.expires.unwrap() > now);
1024    }
1025
1026    #[test]
1027    fn test_memory_tracking() {
1028        let cache = DefaultListFilesCache::new(1000, None);
1029
1030        // Verify cache starts with 0 memory used
1031        {
1032            let state = cache.state.lock().unwrap();
1033            assert_eq!(state.memory_used, 0);
1034        }
1035
1036        // Add entry and verify memory tracking
1037        let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 100);
1038        let table_ref = Some(TableReference::from("table"));
1039        let key1 = TableScopedPath {
1040            table: table_ref.clone(),
1041            path: path1,
1042        };
1043        cache.put(&key1, value1);
1044        {
1045            let state = cache.state.lock().unwrap();
1046            assert_eq!(state.memory_used, size1);
1047        }
1048
1049        // Add another entry
1050        let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 200);
1051        let key2 = TableScopedPath {
1052            table: table_ref.clone(),
1053            path: path2,
1054        };
1055        cache.put(&key2, value2);
1056        {
1057            let state = cache.state.lock().unwrap();
1058            assert_eq!(state.memory_used, size1 + size2);
1059        }
1060
1061        // Remove first entry and verify memory decreases
1062        cache.remove(&key1);
1063        {
1064            let state = cache.state.lock().unwrap();
1065            assert_eq!(state.memory_used, size2);
1066        }
1067
1068        // Clear and verify memory is 0
1069        cache.clear();
1070        {
1071            let state = cache.state.lock().unwrap();
1072            assert_eq!(state.memory_used, 0);
1073        }
1074    }
1075
1076    // Prefix filtering tests using CachedFileList::filter_by_prefix
1077
1078    /// Helper function to create ObjectMeta with a specific location path
1079    fn create_object_meta_with_path(location: &str) -> ObjectMeta {
1080        ObjectMeta {
1081            location: Path::from(location),
1082            last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
1083                .unwrap()
1084                .into(),
1085            size: 1024,
1086            e_tag: None,
1087            version: None,
1088        }
1089    }
1090
1091    #[test]
1092    fn test_prefix_filtering() {
1093        let cache = DefaultListFilesCache::new(100000, None);
1094
1095        // Create files for a partitioned table
1096        let table_base = Path::from("my_table");
1097        let files = vec![
1098            create_object_meta_with_path("my_table/a=1/file1.parquet"),
1099            create_object_meta_with_path("my_table/a=1/file2.parquet"),
1100            create_object_meta_with_path("my_table/a=2/file3.parquet"),
1101            create_object_meta_with_path("my_table/a=2/file4.parquet"),
1102        ];
1103
1104        // Cache the full table listing
1105        let table_ref = Some(TableReference::from("table"));
1106        let key = TableScopedPath {
1107            table: table_ref,
1108            path: table_base,
1109        };
1110        cache.put(&key, CachedFileList::new(files));
1111
1112        let result = cache.get(&key).unwrap();
1113
1114        // Filter for partition a=1
1115        let prefix_a1 = Some(Path::from("my_table/a=1"));
1116        let filtered = result.files_matching_prefix(&prefix_a1);
1117        assert_eq!(filtered.len(), 2);
1118        assert!(
1119            filtered
1120                .iter()
1121                .all(|m| m.location.as_ref().starts_with("my_table/a=1"))
1122        );
1123
1124        // Filter for partition a=2
1125        let prefix_a2 = Some(Path::from("my_table/a=2"));
1126        let filtered_2 = result.files_matching_prefix(&prefix_a2);
1127        assert_eq!(filtered_2.len(), 2);
1128        assert!(
1129            filtered_2
1130                .iter()
1131                .all(|m| m.location.as_ref().starts_with("my_table/a=2"))
1132        );
1133
1134        // No filter returns all
1135        let all = result.files_matching_prefix(&None);
1136        assert_eq!(all.len(), 4);
1137    }
1138
1139    #[test]
1140    fn test_prefix_no_matching_files() {
1141        let cache = DefaultListFilesCache::new(100000, None);
1142
1143        let table_base = Path::from("my_table");
1144        let files = vec![
1145            create_object_meta_with_path("my_table/a=1/file1.parquet"),
1146            create_object_meta_with_path("my_table/a=2/file2.parquet"),
1147        ];
1148
1149        let table_ref = Some(TableReference::from("table"));
1150        let key = TableScopedPath {
1151            table: table_ref,
1152            path: table_base,
1153        };
1154        cache.put(&key, CachedFileList::new(files));
1155        let result = cache.get(&key).unwrap();
1156
1157        // Query for partition a=3 which doesn't exist
1158        let prefix_a3 = Some(Path::from("my_table/a=3"));
1159        let filtered = result.files_matching_prefix(&prefix_a3);
1160        assert!(filtered.is_empty());
1161    }
1162
1163    #[test]
1164    fn test_nested_partitions() {
1165        let cache = DefaultListFilesCache::new(100000, None);
1166
1167        let table_base = Path::from("events");
1168        let files = vec![
1169            create_object_meta_with_path(
1170                "events/year=2024/month=01/day=01/file1.parquet",
1171            ),
1172            create_object_meta_with_path(
1173                "events/year=2024/month=01/day=02/file2.parquet",
1174            ),
1175            create_object_meta_with_path(
1176                "events/year=2024/month=02/day=01/file3.parquet",
1177            ),
1178            create_object_meta_with_path(
1179                "events/year=2025/month=01/day=01/file4.parquet",
1180            ),
1181        ];
1182
1183        let table_ref = Some(TableReference::from("table"));
1184        let key = TableScopedPath {
1185            table: table_ref,
1186            path: table_base,
1187        };
1188        cache.put(&key, CachedFileList::new(files));
1189        let result = cache.get(&key).unwrap();
1190
1191        // Filter for year=2024/month=01
1192        let prefix_month = Some(Path::from("events/year=2024/month=01"));
1193        let filtered = result.files_matching_prefix(&prefix_month);
1194        assert_eq!(filtered.len(), 2);
1195
1196        // Filter for year=2024
1197        let prefix_year = Some(Path::from("events/year=2024"));
1198        let filtered_year = result.files_matching_prefix(&prefix_year);
1199        assert_eq!(filtered_year.len(), 3);
1200    }
1201
1202    #[test]
1203    fn test_drop_table_entries() {
1204        let cache = DefaultListFilesCache::default();
1205
1206        let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100);
1207        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
1208        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
1209
1210        let table_ref1 = Some(TableReference::from("table1"));
1211        let key1 = TableScopedPath {
1212            table: table_ref1.clone(),
1213            path: path1,
1214        };
1215        let key2 = TableScopedPath {
1216            table: table_ref1.clone(),
1217            path: path2,
1218        };
1219
1220        let table_ref2 = Some(TableReference::from("table2"));
1221        let key3 = TableScopedPath {
1222            table: table_ref2.clone(),
1223            path: path3,
1224        };
1225
1226        cache.put(&key1, value1);
1227        cache.put(&key2, value2);
1228        cache.put(&key3, value3);
1229
1230        cache.drop_table_entries(&table_ref1).unwrap();
1231
1232        assert!(!cache.contains_key(&key1));
1233        assert!(!cache.contains_key(&key2));
1234        assert!(cache.contains_key(&key3));
1235    }
1236}