use std::mem::size_of;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::Duration,
};
use datafusion_common::TableReference;
use datafusion_common::instant::Instant;
use object_store::{ObjectMeta, path::Path};
use crate::cache::{
CacheAccessor,
cache_manager::{CachedFileList, ListFilesCache},
lru_queue::LruQueue,
};
pub trait TimeProvider: Send + Sync + 'static {
fn now(&self) -> Instant;
}
#[derive(Debug, Default)]
pub struct SystemTimeProvider;
impl TimeProvider for SystemTimeProvider {
fn now(&self) -> Instant {
Instant::now()
}
}
pub struct DefaultListFilesCache {
state: Mutex<DefaultListFilesCacheState>,
time_provider: Arc<dyn TimeProvider>,
}
impl Default for DefaultListFilesCache {
fn default() -> Self {
Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None)
}
}
impl DefaultListFilesCache {
pub fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
Self {
state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)),
time_provider: Arc::new(SystemTimeProvider),
}
}
#[cfg(test)]
pub(crate) fn with_time_provider(mut self, provider: Arc<dyn TimeProvider>) -> Self {
self.time_provider = provider;
self
}
}
#[derive(Clone, PartialEq, Debug)]
pub struct ListFilesEntry {
pub metas: CachedFileList,
pub size_bytes: usize,
pub expires: Option<Instant>,
}
impl ListFilesEntry {
fn try_new(
cached_file_list: CachedFileList,
ttl: Option<Duration>,
now: Instant,
) -> Option<Self> {
let size_bytes = (cached_file_list.files.capacity() * size_of::<ObjectMeta>())
+ cached_file_list
.files
.iter()
.map(meta_heap_bytes)
.reduce(|acc, b| acc + b)?;
Some(Self {
metas: cached_file_list,
size_bytes,
expires: ttl.map(|t| now + t),
})
}
}
fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {
let mut size = object_meta.location.as_ref().len();
if let Some(e) = &object_meta.e_tag {
size += e.len();
}
if let Some(v) = &object_meta.version {
size += v.len();
}
size
}
pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024;
pub const DEFAULT_LIST_FILES_CACHE_TTL: Option<Duration> = None;
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub struct TableScopedPath {
pub table: Option<TableReference>,
pub path: Path,
}
pub struct DefaultListFilesCacheState {
lru_queue: LruQueue<TableScopedPath, ListFilesEntry>,
memory_limit: usize,
memory_used: usize,
ttl: Option<Duration>,
}
impl Default for DefaultListFilesCacheState {
fn default() -> Self {
Self {
lru_queue: LruQueue::new(),
memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
memory_used: 0,
ttl: DEFAULT_LIST_FILES_CACHE_TTL,
}
}
}
impl DefaultListFilesCacheState {
fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
Self {
lru_queue: LruQueue::new(),
memory_limit,
memory_used: 0,
ttl,
}
}
fn get(&mut self, key: &TableScopedPath, now: Instant) -> Option<CachedFileList> {
let entry = self.lru_queue.get(key)?;
if let Some(exp) = entry.expires
&& now > exp
{
self.remove(key);
return None;
}
Some(entry.metas.clone())
}
fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool {
let Some(entry) = self.lru_queue.peek(k) else {
return false;
};
match entry.expires {
Some(exp) if now > exp => {
self.remove(k);
false
}
_ => true,
}
}
fn put(
&mut self,
key: &TableScopedPath,
value: CachedFileList,
now: Instant,
) -> Option<CachedFileList> {
let entry = ListFilesEntry::try_new(value, self.ttl, now)?;
let entry_size = entry.size_bytes;
if entry_size > self.memory_limit {
return None;
}
let old_value = self.lru_queue.put(key.clone(), entry);
self.memory_used += entry_size;
if let Some(entry) = &old_value {
self.memory_used -= entry.size_bytes;
}
self.evict_entries();
old_value.map(|v| v.metas)
}
fn evict_entries(&mut self) {
while self.memory_used > self.memory_limit {
if let Some(removed) = self.lru_queue.pop() {
self.memory_used -= removed.1.size_bytes;
} else {
debug_assert!(
false,
"cache is empty while memory_used > memory_limit, cannot happen"
);
return;
}
}
}
fn remove(&mut self, k: &TableScopedPath) -> Option<CachedFileList> {
if let Some(entry) = self.lru_queue.remove(k) {
self.memory_used -= entry.size_bytes;
Some(entry.metas)
} else {
None
}
}
fn len(&self) -> usize {
self.lru_queue.len()
}
fn clear(&mut self) {
self.lru_queue.clear();
self.memory_used = 0;
}
}
impl CacheAccessor<TableScopedPath, CachedFileList> for DefaultListFilesCache {
fn get(&self, key: &TableScopedPath) -> Option<CachedFileList> {
let mut state = self.state.lock().unwrap();
let now = self.time_provider.now();
state.get(key, now)
}
fn put(
&self,
key: &TableScopedPath,
value: CachedFileList,
) -> Option<CachedFileList> {
let mut state = self.state.lock().unwrap();
let now = self.time_provider.now();
state.put(key, value, now)
}
fn remove(&self, k: &TableScopedPath) -> Option<CachedFileList> {
let mut state = self.state.lock().unwrap();
state.remove(k)
}
fn contains_key(&self, k: &TableScopedPath) -> bool {
let mut state = self.state.lock().unwrap();
let now = self.time_provider.now();
state.contains_key(k, now)
}
fn len(&self) -> usize {
let state = self.state.lock().unwrap();
state.len()
}
fn clear(&self) {
let mut state = self.state.lock().unwrap();
state.clear();
}
fn name(&self) -> String {
String::from("DefaultListFilesCache")
}
}
impl ListFilesCache for DefaultListFilesCache {
fn cache_limit(&self) -> usize {
let state = self.state.lock().unwrap();
state.memory_limit
}
fn cache_ttl(&self) -> Option<Duration> {
let state = self.state.lock().unwrap();
state.ttl
}
fn update_cache_limit(&self, limit: usize) {
let mut state = self.state.lock().unwrap();
state.memory_limit = limit;
state.evict_entries();
}
fn update_cache_ttl(&self, ttl: Option<Duration>) {
let mut state = self.state.lock().unwrap();
state.ttl = ttl;
state.evict_entries();
}
fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry> {
let state = self.state.lock().unwrap();
let mut entries = HashMap::<TableScopedPath, ListFilesEntry>::new();
for (path, entry) in state.lru_queue.list_entries() {
entries.insert(path.clone(), entry.clone());
}
entries
}
fn drop_table_entries(
&self,
table_ref: &Option<TableReference>,
) -> datafusion_common::Result<()> {
let mut state = self.state.lock().unwrap();
let mut table_paths = vec![];
for (path, _) in state.lru_queue.list_entries() {
if path.table == *table_ref {
table_paths.push(path.clone());
}
}
for path in table_paths {
state.remove(&path);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::DateTime;
use std::thread;
struct MockTimeProvider {
base: Instant,
offset: Mutex<Duration>,
}
impl MockTimeProvider {
fn new() -> Self {
Self {
base: Instant::now(),
offset: Mutex::new(Duration::ZERO),
}
}
fn inc(&self, duration: Duration) {
let mut offset = self.offset.lock().unwrap();
*offset += duration;
}
}
impl TimeProvider for MockTimeProvider {
fn now(&self) -> Instant {
self.base + *self.offset.lock().unwrap()
}
}
fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta {
let location_str = if location_size > path.len() {
format!("{}{}", path, "0".repeat(location_size - path.len()))
} else {
path.to_string()
};
ObjectMeta {
location: Path::from(location_str),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
}
}
fn create_test_list_files_entry(
path: &str,
count: usize,
meta_size: usize,
) -> (Path, CachedFileList, usize) {
let metas: Vec<ObjectMeta> = (0..count)
.map(|i| create_test_object_meta(&format!("file{i}"), meta_size))
.collect();
let size = (metas.capacity() * size_of::<ObjectMeta>())
+ metas.iter().map(meta_heap_bytes).sum::<usize>();
(Path::from(path), CachedFileList::new(metas), size)
}
#[test]
fn test_basic_operations() {
let cache = DefaultListFilesCache::default();
let table_ref = Some(TableReference::from("table"));
let path = Path::from("test_path");
let key = TableScopedPath {
table: table_ref.clone(),
path,
};
assert!(!cache.contains_key(&key));
assert_eq!(cache.len(), 0);
assert!(cache.get(&key).is_none());
let meta = create_test_object_meta("file1", 50);
cache.put(&key, CachedFileList::new(vec![meta]));
assert!(cache.contains_key(&key));
assert_eq!(cache.len(), 1);
let result = cache.get(&key).unwrap();
assert_eq!(result.files.len(), 1);
let removed = cache.remove(&key).unwrap();
assert_eq!(removed.files.len(), 1);
assert!(!cache.contains_key(&key));
assert_eq!(cache.len(), 0);
let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50);
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref,
path: path2,
};
cache.put(&key1, value1.clone());
cache.put(&key2, value2.clone());
assert_eq!(cache.len(), 2);
assert_eq!(
cache.list_entries(),
HashMap::from([
(
key1.clone(),
ListFilesEntry {
metas: value1,
size_bytes: size1,
expires: None,
}
),
(
key2.clone(),
ListFilesEntry {
metas: value2,
size_bytes: size2,
expires: None,
}
)
])
);
cache.clear();
assert_eq!(cache.len(), 0);
assert!(!cache.contains_key(&key1));
assert!(!cache.contains_key(&key2));
}
#[test]
fn test_lru_eviction_basic() {
let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
let cache = DefaultListFilesCache::new(size * 3, None);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
let key3 = TableScopedPath {
table: table_ref.clone(),
path: path3,
};
cache.put(&key1, value1);
cache.put(&key2, value2);
cache.put(&key3, value3);
assert_eq!(cache.len(), 3);
assert!(cache.contains_key(&key1));
assert!(cache.contains_key(&key2));
assert!(cache.contains_key(&key3));
let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
let key4 = TableScopedPath {
table: table_ref,
path: path4,
};
cache.put(&key4, value4);
assert_eq!(cache.len(), 3);
assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
assert!(cache.contains_key(&key3));
assert!(cache.contains_key(&key4));
}
#[test]
fn test_lru_ordering_after_access() {
let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
let cache = DefaultListFilesCache::new(size * 3, None);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
let key3 = TableScopedPath {
table: table_ref.clone(),
path: path3,
};
cache.put(&key1, value1);
cache.put(&key2, value2);
cache.put(&key3, value3);
assert_eq!(cache.len(), 3);
let _ = cache.get(&key1);
let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
let key4 = TableScopedPath {
table: table_ref,
path: path4,
};
cache.put(&key4, value4);
assert_eq!(cache.len(), 3);
assert!(cache.contains_key(&key1)); assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3));
assert!(cache.contains_key(&key4));
}
#[test]
fn test_reject_too_large() {
let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
let cache = DefaultListFilesCache::new(size * 2, None);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
cache.put(&key1, value1);
cache.put(&key2, value2);
assert_eq!(cache.len(), 2);
let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000);
let key_large = TableScopedPath {
table: table_ref,
path: path_large,
};
cache.put(&key_large, value_large);
assert!(!cache.contains_key(&key_large));
assert_eq!(cache.len(), 2);
assert!(cache.contains_key(&key1));
assert!(cache.contains_key(&key2));
}
#[test]
fn test_multiple_evictions() {
let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
let cache = DefaultListFilesCache::new(size * 3, None);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
let key3 = TableScopedPath {
table: table_ref.clone(),
path: path3,
};
cache.put(&key1, value1);
cache.put(&key2, value2);
cache.put(&key3, value3);
assert_eq!(cache.len(), 3);
let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 200);
let key_large = TableScopedPath {
table: table_ref,
path: path_large,
};
cache.put(&key_large, value_large);
assert_eq!(cache.len(), 2);
assert!(!cache.contains_key(&key1)); assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3));
assert!(cache.contains_key(&key_large));
}
#[test]
fn test_cache_limit_resize() {
let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
let cache = DefaultListFilesCache::new(size * 3, None);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
let key3 = TableScopedPath {
table: table_ref,
path: path3,
};
cache.put(&key1, value1);
cache.put(&key2, value2);
cache.put(&key3, value3);
assert_eq!(cache.len(), 3);
cache.update_cache_limit(size);
assert_eq!(cache.len(), 1);
assert!(cache.contains_key(&key3));
assert!(!cache.contains_key(&key1));
assert!(!cache.contains_key(&key2));
}
#[test]
fn test_entry_update_with_size_change() {
let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 100);
let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 100);
let cache = DefaultListFilesCache::new(size * 3, None);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
let key3 = TableScopedPath {
table: table_ref,
path: path3,
};
cache.put(&key1, value1);
cache.put(&key2, value2.clone());
cache.put(&key3, value3_v1);
assert_eq!(cache.len(), 3);
let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100);
cache.put(&key3, value3_v2);
assert_eq!(cache.len(), 3);
assert!(cache.contains_key(&key1));
assert!(cache.contains_key(&key2));
assert!(cache.contains_key(&key3));
let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200);
cache.put(&key3, value3_v3.clone());
assert_eq!(cache.len(), 2);
assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
assert!(cache.contains_key(&key3));
assert_eq!(
cache.list_entries(),
HashMap::from([
(
key2,
ListFilesEntry {
metas: value2,
size_bytes: size2,
expires: None,
}
),
(
key3,
ListFilesEntry {
metas: value3_v3,
size_bytes: size3_v3,
expires: None,
}
)
])
);
}
#[test]
fn test_cache_with_ttl() {
let ttl = Duration::from_millis(100);
let mock_time = Arc::new(MockTimeProvider::new());
let cache = DefaultListFilesCache::new(10000, Some(ttl))
.with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref,
path: path2,
};
cache.put(&key1, value1.clone());
cache.put(&key2, value2.clone());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key2).is_some());
assert_eq!(
cache.list_entries(),
HashMap::from([
(
key1.clone(),
ListFilesEntry {
metas: value1,
size_bytes: size1,
expires: mock_time.now().checked_add(ttl),
}
),
(
key2.clone(),
ListFilesEntry {
metas: value2,
size_bytes: size2,
expires: mock_time.now().checked_add(ttl),
}
)
])
);
mock_time.inc(Duration::from_millis(150));
assert!(!cache.contains_key(&key1));
assert_eq!(cache.len(), 1); assert!(!cache.contains_key(&key2));
assert_eq!(cache.len(), 0); }
#[test]
fn test_cache_with_ttl_and_lru() {
let ttl = Duration::from_millis(200);
let mock_time = Arc::new(MockTimeProvider::new());
let cache = DefaultListFilesCache::new(1000, Some(ttl))
.with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
let key3 = TableScopedPath {
table: table_ref,
path: path3,
};
cache.put(&key1, value1);
mock_time.inc(Duration::from_millis(50));
cache.put(&key2, value2);
mock_time.inc(Duration::from_millis(50));
cache.put(&key3, value3);
assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
assert!(cache.contains_key(&key3));
mock_time.inc(Duration::from_millis(151));
assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3)); }
#[test]
fn test_ttl_expiration_in_get() {
let ttl = Duration::from_millis(100);
let cache = DefaultListFilesCache::new(10000, Some(ttl));
let (path, value, _) = create_test_list_files_entry("path", 2, 50);
let table_ref = Some(TableReference::from("table"));
let key = TableScopedPath {
table: table_ref,
path,
};
cache.put(&key, value.clone());
let result = cache.get(&key);
assert!(result.is_some());
assert_eq!(result.unwrap().files.len(), 2);
thread::sleep(Duration::from_millis(150));
let result2 = cache.get(&key);
assert!(result2.is_none());
}
#[test]
fn test_meta_heap_bytes_calculation() {
let meta1 = ObjectMeta {
location: Path::from("test"),
last_modified: chrono::Utc::now(),
size: 100,
e_tag: None,
version: None,
};
assert_eq!(meta_heap_bytes(&meta1), 4);
let meta2 = ObjectMeta {
location: Path::from("test"),
last_modified: chrono::Utc::now(),
size: 100,
e_tag: Some("etag123".to_string()),
version: None,
};
assert_eq!(meta_heap_bytes(&meta2), 4 + 7);
let meta3 = ObjectMeta {
location: Path::from("test"),
last_modified: chrono::Utc::now(),
size: 100,
e_tag: None,
version: Some("v1.0".to_string()),
};
assert_eq!(meta_heap_bytes(&meta3), 4 + 4);
let meta4 = ObjectMeta {
location: Path::from("test"),
last_modified: chrono::Utc::now(),
size: 100,
e_tag: Some("tag".to_string()),
version: Some("ver".to_string()),
};
assert_eq!(meta_heap_bytes(&meta4), 4 + 3 + 3); }
#[test]
fn test_entry_creation() {
let empty_list = CachedFileList::new(vec![]);
let now = Instant::now();
let entry = ListFilesEntry::try_new(empty_list, None, now);
assert!(entry.is_none());
let metas: Vec<ObjectMeta> = (0..5)
.map(|i| create_test_object_meta(&format!("file{i}"), 30))
.collect();
let cached_list = CachedFileList::new(metas);
let entry = ListFilesEntry::try_new(cached_list, None, now).unwrap();
assert_eq!(entry.metas.files.len(), 5);
let expected_size = (entry.metas.files.capacity() * size_of::<ObjectMeta>())
+ (entry.metas.files.len() * 30);
assert_eq!(entry.size_bytes, expected_size);
let meta = create_test_object_meta("file", 50);
let ttl = Duration::from_secs(10);
let cached_list = CachedFileList::new(vec![meta]);
let entry = ListFilesEntry::try_new(cached_list, Some(ttl), now).unwrap();
assert!(entry.expires.unwrap() > now);
}
#[test]
fn test_memory_tracking() {
let cache = DefaultListFilesCache::new(1000, None);
{
let state = cache.state.lock().unwrap();
assert_eq!(state.memory_used, 0);
}
let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 100);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
cache.put(&key1, value1);
{
let state = cache.state.lock().unwrap();
assert_eq!(state.memory_used, size1);
}
let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 200);
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
cache.put(&key2, value2);
{
let state = cache.state.lock().unwrap();
assert_eq!(state.memory_used, size1 + size2);
}
cache.remove(&key1);
{
let state = cache.state.lock().unwrap();
assert_eq!(state.memory_used, size2);
}
cache.clear();
{
let state = cache.state.lock().unwrap();
assert_eq!(state.memory_used, 0);
}
}
fn create_object_meta_with_path(location: &str) -> ObjectMeta {
ObjectMeta {
location: Path::from(location),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
}
}
#[test]
fn test_prefix_filtering() {
let cache = DefaultListFilesCache::new(100000, None);
let table_base = Path::from("my_table");
let files = vec![
create_object_meta_with_path("my_table/a=1/file1.parquet"),
create_object_meta_with_path("my_table/a=1/file2.parquet"),
create_object_meta_with_path("my_table/a=2/file3.parquet"),
create_object_meta_with_path("my_table/a=2/file4.parquet"),
];
let table_ref = Some(TableReference::from("table"));
let key = TableScopedPath {
table: table_ref,
path: table_base,
};
cache.put(&key, CachedFileList::new(files));
let result = cache.get(&key).unwrap();
let prefix_a1 = Some(Path::from("my_table/a=1"));
let filtered = result.files_matching_prefix(&prefix_a1);
assert_eq!(filtered.len(), 2);
assert!(
filtered
.iter()
.all(|m| m.location.as_ref().starts_with("my_table/a=1"))
);
let prefix_a2 = Some(Path::from("my_table/a=2"));
let filtered_2 = result.files_matching_prefix(&prefix_a2);
assert_eq!(filtered_2.len(), 2);
assert!(
filtered_2
.iter()
.all(|m| m.location.as_ref().starts_with("my_table/a=2"))
);
let all = result.files_matching_prefix(&None);
assert_eq!(all.len(), 4);
}
#[test]
fn test_prefix_no_matching_files() {
let cache = DefaultListFilesCache::new(100000, None);
let table_base = Path::from("my_table");
let files = vec![
create_object_meta_with_path("my_table/a=1/file1.parquet"),
create_object_meta_with_path("my_table/a=2/file2.parquet"),
];
let table_ref = Some(TableReference::from("table"));
let key = TableScopedPath {
table: table_ref,
path: table_base,
};
cache.put(&key, CachedFileList::new(files));
let result = cache.get(&key).unwrap();
let prefix_a3 = Some(Path::from("my_table/a=3"));
let filtered = result.files_matching_prefix(&prefix_a3);
assert!(filtered.is_empty());
}
#[test]
fn test_nested_partitions() {
let cache = DefaultListFilesCache::new(100000, None);
let table_base = Path::from("events");
let files = vec![
create_object_meta_with_path(
"events/year=2024/month=01/day=01/file1.parquet",
),
create_object_meta_with_path(
"events/year=2024/month=01/day=02/file2.parquet",
),
create_object_meta_with_path(
"events/year=2024/month=02/day=01/file3.parquet",
),
create_object_meta_with_path(
"events/year=2025/month=01/day=01/file4.parquet",
),
];
let table_ref = Some(TableReference::from("table"));
let key = TableScopedPath {
table: table_ref,
path: table_base,
};
cache.put(&key, CachedFileList::new(files));
let result = cache.get(&key).unwrap();
let prefix_month = Some(Path::from("events/year=2024/month=01"));
let filtered = result.files_matching_prefix(&prefix_month);
assert_eq!(filtered.len(), 2);
let prefix_year = Some(Path::from("events/year=2024"));
let filtered_year = result.files_matching_prefix(&prefix_year);
assert_eq!(filtered_year.len(), 3);
}
#[test]
fn test_drop_table_entries() {
let cache = DefaultListFilesCache::default();
let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
let table_ref1 = Some(TableReference::from("table1"));
let key1 = TableScopedPath {
table: table_ref1.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref1.clone(),
path: path2,
};
let table_ref2 = Some(TableReference::from("table2"));
let key3 = TableScopedPath {
table: table_ref2.clone(),
path: path3,
};
cache.put(&key1, value1);
cache.put(&key2, value2);
cache.put(&key3, value3);
cache.drop_table_entries(&table_ref1).unwrap();
assert!(!cache.contains_key(&key1));
assert!(!cache.contains_key(&key2));
assert!(cache.contains_key(&key3));
}
}