use std::mem::size_of;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::Duration,
};
use datafusion_common::TableReference;
use datafusion_common::instant::Instant;
use object_store::{ObjectMeta, path::Path};
use crate::cache::{CacheAccessor, cache_manager::ListFilesCache, lru_queue::LruQueue};
pub trait TimeProvider: Send + Sync + 'static {
fn now(&self) -> Instant;
}
#[derive(Debug, Default)]
pub struct SystemTimeProvider;
impl TimeProvider for SystemTimeProvider {
fn now(&self) -> Instant {
Instant::now()
}
}
pub struct DefaultListFilesCache {
state: Mutex<DefaultListFilesCacheState>,
time_provider: Arc<dyn TimeProvider>,
}
impl Default for DefaultListFilesCache {
fn default() -> Self {
Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None)
}
}
impl DefaultListFilesCache {
pub fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
Self {
state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)),
time_provider: Arc::new(SystemTimeProvider),
}
}
#[cfg(test)]
pub(crate) fn with_time_provider(mut self, provider: Arc<dyn TimeProvider>) -> Self {
self.time_provider = provider;
self
}
pub fn cache_limit(&self) -> usize {
self.state.lock().unwrap().memory_limit
}
pub fn update_cache_limit(&self, limit: usize) {
let mut state = self.state.lock().unwrap();
state.memory_limit = limit;
state.evict_entries();
}
pub fn cache_ttl(&self) -> Option<Duration> {
self.state.lock().unwrap().ttl
}
}
#[derive(Clone, PartialEq, Debug)]
pub struct ListFilesEntry {
pub metas: Arc<Vec<ObjectMeta>>,
pub size_bytes: usize,
pub expires: Option<Instant>,
}
impl ListFilesEntry {
fn try_new(
metas: Arc<Vec<ObjectMeta>>,
ttl: Option<Duration>,
now: Instant,
) -> Option<Self> {
let size_bytes = (metas.capacity() * size_of::<ObjectMeta>())
+ metas.iter().map(meta_heap_bytes).reduce(|acc, b| acc + b)?;
Some(Self {
metas,
size_bytes,
expires: ttl.map(|t| now + t),
})
}
}
fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {
let mut size = object_meta.location.as_ref().len();
if let Some(e) = &object_meta.e_tag {
size += e.len();
}
if let Some(v) = &object_meta.version {
size += v.len();
}
size
}
pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024;
pub const DEFAULT_LIST_FILES_CACHE_TTL: Option<Duration> = None;
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub struct TableScopedPath {
pub table: Option<TableReference>,
pub path: Path,
}
pub struct DefaultListFilesCacheState {
lru_queue: LruQueue<TableScopedPath, ListFilesEntry>,
memory_limit: usize,
memory_used: usize,
ttl: Option<Duration>,
}
impl Default for DefaultListFilesCacheState {
fn default() -> Self {
Self {
lru_queue: LruQueue::new(),
memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
memory_used: 0,
ttl: DEFAULT_LIST_FILES_CACHE_TTL,
}
}
}
impl DefaultListFilesCacheState {
fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
Self {
lru_queue: LruQueue::new(),
memory_limit,
memory_used: 0,
ttl,
}
}
fn get_with_prefix(
&mut self,
table_scoped_base_path: &TableScopedPath,
prefix: Option<&Path>,
now: Instant,
) -> Option<Arc<Vec<ObjectMeta>>> {
let entry = self.lru_queue.get(table_scoped_base_path)?;
if let Some(exp) = entry.expires
&& now > exp
{
self.remove(table_scoped_base_path);
return None;
}
let Some(prefix) = prefix else {
return Some(Arc::clone(&entry.metas));
};
let table_base = &table_scoped_base_path.path;
let mut parts: Vec<_> = table_base.parts().collect();
parts.extend(prefix.parts());
let full_prefix = Path::from_iter(parts);
let full_prefix_str = full_prefix.as_ref();
let filtered: Vec<ObjectMeta> = entry
.metas
.iter()
.filter(|meta| meta.location.as_ref().starts_with(full_prefix_str))
.cloned()
.collect();
if filtered.is_empty() {
None
} else {
Some(Arc::new(filtered))
}
}
fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool {
let Some(entry) = self.lru_queue.peek(k) else {
return false;
};
match entry.expires {
Some(exp) if now > exp => {
self.remove(k);
false
}
_ => true,
}
}
fn put(
&mut self,
key: &TableScopedPath,
value: Arc<Vec<ObjectMeta>>,
now: Instant,
) -> Option<Arc<Vec<ObjectMeta>>> {
let entry = ListFilesEntry::try_new(value, self.ttl, now)?;
let entry_size = entry.size_bytes;
if entry_size > self.memory_limit {
return None;
}
let old_value = self.lru_queue.put(key.clone(), entry);
self.memory_used += entry_size;
if let Some(entry) = &old_value {
self.memory_used -= entry.size_bytes;
}
self.evict_entries();
old_value.map(|v| v.metas)
}
fn evict_entries(&mut self) {
while self.memory_used > self.memory_limit {
if let Some(removed) = self.lru_queue.pop() {
self.memory_used -= removed.1.size_bytes;
} else {
debug_assert!(
false,
"cache is empty while memory_used > memory_limit, cannot happen"
);
return;
}
}
}
fn remove(&mut self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
if let Some(entry) = self.lru_queue.remove(k) {
self.memory_used -= entry.size_bytes;
Some(entry.metas)
} else {
None
}
}
fn len(&self) -> usize {
self.lru_queue.len()
}
fn clear(&mut self) {
self.lru_queue.clear();
self.memory_used = 0;
}
}
impl ListFilesCache for DefaultListFilesCache {
fn cache_limit(&self) -> usize {
let state = self.state.lock().unwrap();
state.memory_limit
}
fn cache_ttl(&self) -> Option<Duration> {
let state = self.state.lock().unwrap();
state.ttl
}
fn update_cache_limit(&self, limit: usize) {
let mut state = self.state.lock().unwrap();
state.memory_limit = limit;
state.evict_entries();
}
fn update_cache_ttl(&self, ttl: Option<Duration>) {
let mut state = self.state.lock().unwrap();
state.ttl = ttl;
state.evict_entries();
}
fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry> {
let state = self.state.lock().unwrap();
let mut entries = HashMap::<TableScopedPath, ListFilesEntry>::new();
for (path, entry) in state.lru_queue.list_entries() {
entries.insert(path.clone(), entry.clone());
}
entries
}
fn drop_table_entries(
&self,
table_ref: &Option<TableReference>,
) -> datafusion_common::Result<()> {
let mut state = self.state.lock().unwrap();
let mut table_paths = vec![];
for (path, _) in state.lru_queue.list_entries() {
if path.table == *table_ref {
table_paths.push(path.clone());
}
}
for path in table_paths {
state.remove(&path);
}
Ok(())
}
}
impl CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
type Extra = Option<Path>;
fn get(&self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
self.get_with_extra(k, &None)
}
fn get_with_extra(
&self,
table_scoped_path: &TableScopedPath,
prefix: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
let mut state = self.state.lock().unwrap();
let now = self.time_provider.now();
state.get_with_prefix(table_scoped_path, prefix.as_ref(), now)
}
fn put(
&self,
key: &TableScopedPath,
value: Arc<Vec<ObjectMeta>>,
) -> Option<Arc<Vec<ObjectMeta>>> {
let mut state = self.state.lock().unwrap();
let now = self.time_provider.now();
state.put(key, value, now)
}
fn put_with_extra(
&self,
key: &TableScopedPath,
value: Arc<Vec<ObjectMeta>>,
_e: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
self.put(key, value)
}
fn remove(&self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
let mut state = self.state.lock().unwrap();
state.remove(k)
}
fn contains_key(&self, k: &TableScopedPath) -> bool {
let mut state = self.state.lock().unwrap();
let now = self.time_provider.now();
state.contains_key(k, now)
}
fn len(&self) -> usize {
let state = self.state.lock().unwrap();
state.len()
}
fn clear(&self) {
let mut state = self.state.lock().unwrap();
state.clear();
}
fn name(&self) -> String {
String::from("DefaultListFilesCache")
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::DateTime;
struct MockTimeProvider {
base: Instant,
offset: Mutex<Duration>,
}
impl MockTimeProvider {
fn new() -> Self {
Self {
base: Instant::now(),
offset: Mutex::new(Duration::ZERO),
}
}
fn inc(&self, duration: Duration) {
let mut offset = self.offset.lock().unwrap();
*offset += duration;
}
}
impl TimeProvider for MockTimeProvider {
fn now(&self) -> Instant {
self.base + *self.offset.lock().unwrap()
}
}
fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta {
let location_str = if location_size > path.len() {
format!("{}{}", path, "0".repeat(location_size - path.len()))
} else {
path.to_string()
};
ObjectMeta {
location: Path::from(location_str),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
}
}
fn create_test_list_files_entry(
path: &str,
count: usize,
meta_size: usize,
) -> (Path, Arc<Vec<ObjectMeta>>, usize) {
let metas: Vec<ObjectMeta> = (0..count)
.map(|i| create_test_object_meta(&format!("file{i}"), meta_size))
.collect();
let metas = Arc::new(metas);
let size = (metas.capacity() * size_of::<ObjectMeta>())
+ metas.iter().map(meta_heap_bytes).sum::<usize>();
(Path::from(path), metas, size)
}
#[test]
fn test_basic_operations() {
let cache = DefaultListFilesCache::default();
let table_ref = Some(TableReference::from("table"));
let path = Path::from("test_path");
let key = TableScopedPath {
table: table_ref.clone(),
path,
};
assert!(cache.get(&key).is_none());
assert!(!cache.contains_key(&key));
assert_eq!(cache.len(), 0);
let meta = create_test_object_meta("file1", 50);
let value = Arc::new(vec![meta.clone()]);
cache.put(&key, Arc::clone(&value));
assert!(cache.contains_key(&key));
assert_eq!(cache.len(), 1);
let retrieved = cache.get(&key).unwrap();
assert_eq!(retrieved.len(), 1);
assert_eq!(retrieved[0].location, meta.location);
let removed = cache.remove(&key).unwrap();
assert_eq!(removed.len(), 1);
assert!(!cache.contains_key(&key));
assert_eq!(cache.len(), 0);
let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50);
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref,
path: path2,
};
cache.put(&key1, Arc::clone(&value1));
cache.put(&key2, Arc::clone(&value2));
assert_eq!(cache.len(), 2);
assert_eq!(
cache.list_entries(),
HashMap::from([
(
key1.clone(),
ListFilesEntry {
metas: value1,
size_bytes: size1,
expires: None,
}
),
(
key2.clone(),
ListFilesEntry {
metas: value2,
size_bytes: size2,
expires: None,
}
)
])
);
cache.clear();
assert_eq!(cache.len(), 0);
assert!(!cache.contains_key(&key1));
assert!(!cache.contains_key(&key2));
}
#[test]
fn test_lru_eviction_basic() {
let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
let cache = DefaultListFilesCache::new(size * 3, None);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
let key3 = TableScopedPath {
table: table_ref.clone(),
path: path3,
};
cache.put(&key1, value1);
cache.put(&key2, value2);
cache.put(&key3, value3);
assert_eq!(cache.len(), 3);
assert!(cache.contains_key(&key1));
assert!(cache.contains_key(&key2));
assert!(cache.contains_key(&key3));
let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
let key4 = TableScopedPath {
table: table_ref,
path: path4,
};
cache.put(&key4, value4);
assert_eq!(cache.len(), 3);
assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
assert!(cache.contains_key(&key3));
assert!(cache.contains_key(&key4));
}
#[test]
fn test_lru_ordering_after_access() {
let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
let cache = DefaultListFilesCache::new(size * 3, None);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
let key3 = TableScopedPath {
table: table_ref.clone(),
path: path3,
};
cache.put(&key1, value1);
cache.put(&key2, value2);
cache.put(&key3, value3);
assert_eq!(cache.len(), 3);
cache.get(&key1);
let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
let key4 = TableScopedPath {
table: table_ref,
path: path4,
};
cache.put(&key4, value4);
assert_eq!(cache.len(), 3);
assert!(cache.contains_key(&key1)); assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3));
assert!(cache.contains_key(&key4));
}
#[test]
fn test_reject_too_large() {
let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
let cache = DefaultListFilesCache::new(size * 2, None);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
cache.put(&key1, value1);
cache.put(&key2, value2);
assert_eq!(cache.len(), 2);
let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000);
let key_large = TableScopedPath {
table: table_ref,
path: path_large,
};
cache.put(&key_large, value_large);
assert!(!cache.contains_key(&key_large));
assert_eq!(cache.len(), 2);
assert!(cache.contains_key(&key1));
assert!(cache.contains_key(&key2));
}
#[test]
fn test_multiple_evictions() {
let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
let cache = DefaultListFilesCache::new(size * 3, None);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
let key3 = TableScopedPath {
table: table_ref.clone(),
path: path3,
};
cache.put(&key1, value1);
cache.put(&key2, value2);
cache.put(&key3, value3);
assert_eq!(cache.len(), 3);
let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 200);
let key_large = TableScopedPath {
table: table_ref,
path: path_large,
};
cache.put(&key_large, value_large);
assert_eq!(cache.len(), 2);
assert!(!cache.contains_key(&key1)); assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3));
assert!(cache.contains_key(&key_large));
}
#[test]
fn test_cache_limit_resize() {
let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
let cache = DefaultListFilesCache::new(size * 3, None);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
let key3 = TableScopedPath {
table: table_ref,
path: path3,
};
cache.put(&key1, value1);
cache.put(&key2, value2);
cache.put(&key3, value3);
assert_eq!(cache.len(), 3);
cache.update_cache_limit(size);
assert_eq!(cache.len(), 1);
assert!(cache.contains_key(&key3));
assert!(!cache.contains_key(&key1));
assert!(!cache.contains_key(&key2));
}
#[test]
fn test_entry_update_with_size_change() {
let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100);
let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 100);
let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 100);
let cache = DefaultListFilesCache::new(size * 3, None);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
let key3 = TableScopedPath {
table: table_ref,
path: path3,
};
cache.put(&key1, value1);
cache.put(&key2, Arc::clone(&value2));
cache.put(&key3, value3_v1);
assert_eq!(cache.len(), 3);
let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100);
cache.put(&key3, value3_v2);
assert_eq!(cache.len(), 3);
assert!(cache.contains_key(&key1));
assert!(cache.contains_key(&key2));
assert!(cache.contains_key(&key3));
let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200);
cache.put(&key3, Arc::clone(&value3_v3));
assert_eq!(cache.len(), 2);
assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
assert!(cache.contains_key(&key3));
assert_eq!(
cache.list_entries(),
HashMap::from([
(
key2,
ListFilesEntry {
metas: value2,
size_bytes: size2,
expires: None,
}
),
(
key3,
ListFilesEntry {
metas: value3_v3,
size_bytes: size3_v3,
expires: None,
}
)
])
);
}
#[test]
fn test_cache_with_ttl() {
let ttl = Duration::from_millis(100);
let mock_time = Arc::new(MockTimeProvider::new());
let cache = DefaultListFilesCache::new(10000, Some(ttl))
.with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50);
let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref,
path: path2,
};
cache.put(&key1, Arc::clone(&value1));
cache.put(&key2, Arc::clone(&value2));
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key2).is_some());
assert_eq!(
cache.list_entries(),
HashMap::from([
(
key1.clone(),
ListFilesEntry {
metas: value1,
size_bytes: size1,
expires: mock_time.now().checked_add(ttl),
}
),
(
key2.clone(),
ListFilesEntry {
metas: value2,
size_bytes: size2,
expires: mock_time.now().checked_add(ttl),
}
)
])
);
mock_time.inc(Duration::from_millis(150));
assert!(cache.get(&key1).is_none());
assert_eq!(cache.len(), 1); assert!(!cache.contains_key(&key2));
assert_eq!(cache.len(), 0); }
#[test]
fn test_cache_with_ttl_and_lru() {
let ttl = Duration::from_millis(200);
let mock_time = Arc::new(MockTimeProvider::new());
let cache = DefaultListFilesCache::new(1000, Some(ttl))
.with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
let key3 = TableScopedPath {
table: table_ref,
path: path3,
};
cache.put(&key1, value1);
mock_time.inc(Duration::from_millis(50));
cache.put(&key2, value2);
mock_time.inc(Duration::from_millis(50));
cache.put(&key3, value3);
assert!(!cache.contains_key(&key1)); assert!(cache.contains_key(&key2));
assert!(cache.contains_key(&key3));
mock_time.inc(Duration::from_millis(151));
assert!(!cache.contains_key(&key2)); assert!(cache.contains_key(&key3)); }
#[test]
fn test_meta_heap_bytes_calculation() {
let meta1 = ObjectMeta {
location: Path::from("test"),
last_modified: chrono::Utc::now(),
size: 100,
e_tag: None,
version: None,
};
assert_eq!(meta_heap_bytes(&meta1), 4);
let meta2 = ObjectMeta {
location: Path::from("test"),
last_modified: chrono::Utc::now(),
size: 100,
e_tag: Some("etag123".to_string()),
version: None,
};
assert_eq!(meta_heap_bytes(&meta2), 4 + 7);
let meta3 = ObjectMeta {
location: Path::from("test"),
last_modified: chrono::Utc::now(),
size: 100,
e_tag: None,
version: Some("v1.0".to_string()),
};
assert_eq!(meta_heap_bytes(&meta3), 4 + 4);
let meta4 = ObjectMeta {
location: Path::from("test"),
last_modified: chrono::Utc::now(),
size: 100,
e_tag: Some("tag".to_string()),
version: Some("ver".to_string()),
};
assert_eq!(meta_heap_bytes(&meta4), 4 + 3 + 3); }
#[test]
fn test_entry_creation() {
let empty_vec: Arc<Vec<ObjectMeta>> = Arc::new(vec![]);
let now = Instant::now();
let entry = ListFilesEntry::try_new(empty_vec, None, now);
assert!(entry.is_none());
let metas: Vec<ObjectMeta> = (0..5)
.map(|i| create_test_object_meta(&format!("file{i}"), 30))
.collect();
let metas = Arc::new(metas);
let entry = ListFilesEntry::try_new(metas, None, now).unwrap();
assert_eq!(entry.metas.len(), 5);
let expected_size =
(entry.metas.capacity() * size_of::<ObjectMeta>()) + (entry.metas.len() * 30);
assert_eq!(entry.size_bytes, expected_size);
let meta = create_test_object_meta("file", 50);
let ttl = Duration::from_secs(10);
let entry =
ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl), now).unwrap();
assert!(entry.expires.unwrap() > now);
}
#[test]
fn test_memory_tracking() {
let cache = DefaultListFilesCache::new(1000, None);
{
let state = cache.state.lock().unwrap();
assert_eq!(state.memory_used, 0);
}
let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 100);
let table_ref = Some(TableReference::from("table"));
let key1 = TableScopedPath {
table: table_ref.clone(),
path: path1,
};
cache.put(&key1, value1);
{
let state = cache.state.lock().unwrap();
assert_eq!(state.memory_used, size1);
}
let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 200);
let key2 = TableScopedPath {
table: table_ref.clone(),
path: path2,
};
cache.put(&key2, value2);
{
let state = cache.state.lock().unwrap();
assert_eq!(state.memory_used, size1 + size2);
}
cache.remove(&key1);
{
let state = cache.state.lock().unwrap();
assert_eq!(state.memory_used, size2);
}
cache.clear();
{
let state = cache.state.lock().unwrap();
assert_eq!(state.memory_used, 0);
}
}
fn create_object_meta_with_path(location: &str) -> ObjectMeta {
ObjectMeta {
location: Path::from(location),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
}
}
#[test]
fn test_prefix_aware_cache_hit() {
let cache = DefaultListFilesCache::new(100000, None);
let table_base = Path::from("my_table");
let files = Arc::new(vec![
create_object_meta_with_path("my_table/a=1/file1.parquet"),
create_object_meta_with_path("my_table/a=1/file2.parquet"),
create_object_meta_with_path("my_table/a=2/file3.parquet"),
create_object_meta_with_path("my_table/a=2/file4.parquet"),
]);
let table_ref = Some(TableReference::from("table"));
let key = TableScopedPath {
table: table_ref,
path: table_base,
};
cache.put(&key, files);
let prefix_a1 = Some(Path::from("a=1"));
let result = cache.get_with_extra(&key, &prefix_a1);
assert!(result.is_some());
let filtered = result.unwrap();
assert_eq!(filtered.len(), 2);
assert!(
filtered
.iter()
.all(|m| m.location.as_ref().starts_with("my_table/a=1"))
);
let prefix_a2 = Some(Path::from("a=2"));
let result_2 = cache.get_with_extra(&key, &prefix_a2);
assert!(result_2.is_some());
let filtered_2 = result_2.unwrap();
assert_eq!(filtered_2.len(), 2);
assert!(
filtered_2
.iter()
.all(|m| m.location.as_ref().starts_with("my_table/a=2"))
);
}
#[test]
fn test_prefix_aware_cache_no_filter_returns_all() {
let cache = DefaultListFilesCache::new(100000, None);
let table_base = Path::from("my_table");
let full_files = Arc::new(vec![
create_object_meta_with_path("my_table/a=1/file1.parquet"),
create_object_meta_with_path("my_table/a=1/file2.parquet"),
create_object_meta_with_path("my_table/a=2/file3.parquet"),
create_object_meta_with_path("my_table/a=2/file4.parquet"),
]);
let table_ref = Some(TableReference::from("table"));
let key = TableScopedPath {
table: table_ref,
path: table_base,
};
cache.put(&key, full_files);
let result = cache.get_with_extra(&key, &None);
assert!(result.is_some());
let files = result.unwrap();
assert_eq!(files.len(), 4);
let result_get = cache.get(&key);
assert!(result_get.is_some());
assert_eq!(result_get.unwrap().len(), 4);
}
#[test]
fn test_prefix_aware_cache_miss_no_entry() {
let cache = DefaultListFilesCache::new(100000, None);
let table_base = Path::from("my_table");
let table_ref = Some(TableReference::from("table"));
let key = TableScopedPath {
table: table_ref,
path: table_base,
};
let result = cache.get_with_extra(&key, &None);
assert!(result.is_none());
let prefix = Some(Path::from("a=1"));
let result_2 = cache.get_with_extra(&key, &prefix);
assert!(result_2.is_none());
}
#[test]
fn test_prefix_aware_cache_no_matching_files() {
let cache = DefaultListFilesCache::new(100000, None);
let table_base = Path::from("my_table");
let files = Arc::new(vec![
create_object_meta_with_path("my_table/a=1/file1.parquet"),
create_object_meta_with_path("my_table/a=2/file2.parquet"),
]);
let table_ref = Some(TableReference::from("table"));
let key = TableScopedPath {
table: table_ref,
path: table_base,
};
cache.put(&key, files);
let prefix_a3 = Some(Path::from("a=3"));
let result = cache.get_with_extra(&key, &prefix_a3);
assert!(result.is_none());
}
#[test]
fn test_prefix_aware_nested_partitions() {
let cache = DefaultListFilesCache::new(100000, None);
let table_base = Path::from("events");
let files = Arc::new(vec![
create_object_meta_with_path(
"events/year=2024/month=01/day=01/file1.parquet",
),
create_object_meta_with_path(
"events/year=2024/month=01/day=02/file2.parquet",
),
create_object_meta_with_path(
"events/year=2024/month=02/day=01/file3.parquet",
),
create_object_meta_with_path(
"events/year=2025/month=01/day=01/file4.parquet",
),
]);
let table_ref = Some(TableReference::from("table"));
let key = TableScopedPath {
table: table_ref,
path: table_base,
};
cache.put(&key, files);
let prefix_month = Some(Path::from("year=2024/month=01"));
let result = cache.get_with_extra(&key, &prefix_month);
assert!(result.is_some());
assert_eq!(result.unwrap().len(), 2);
let prefix_year = Some(Path::from("year=2024"));
let result_year = cache.get_with_extra(&key, &prefix_year);
assert!(result_year.is_some());
assert_eq!(result_year.unwrap().len(), 3);
let prefix_day = Some(Path::from("year=2024/month=01/day=01"));
let result_day = cache.get_with_extra(&key, &prefix_day);
assert!(result_day.is_some());
assert_eq!(result_day.unwrap().len(), 1);
}
#[test]
fn test_prefix_aware_different_tables() {
let cache = DefaultListFilesCache::new(100000, None);
let table_a = Path::from("table_a");
let table_b = Path::from("table_b");
let files_a = Arc::new(vec![create_object_meta_with_path(
"table_a/part=1/file1.parquet",
)]);
let files_b = Arc::new(vec![
create_object_meta_with_path("table_b/part=1/file1.parquet"),
create_object_meta_with_path("table_b/part=2/file2.parquet"),
]);
let table_ref_a = Some(TableReference::from("table_a"));
let table_ref_b = Some(TableReference::from("table_b"));
let key_a = TableScopedPath {
table: table_ref_a,
path: table_a,
};
let key_b = TableScopedPath {
table: table_ref_b,
path: table_b,
};
cache.put(&key_a, files_a);
cache.put(&key_b, files_b);
let result_a = cache.get(&key_a);
assert!(result_a.is_some());
assert_eq!(result_a.unwrap().len(), 1);
let prefix = Some(Path::from("part=1"));
let result_b = cache.get_with_extra(&key_b, &prefix);
assert!(result_b.is_some());
assert_eq!(result_b.unwrap().len(), 1);
}
#[test]
fn test_drop_table_entries() {
let cache = DefaultListFilesCache::default();
let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
let table_ref1 = Some(TableReference::from("table1"));
let key1 = TableScopedPath {
table: table_ref1.clone(),
path: path1,
};
let key2 = TableScopedPath {
table: table_ref1.clone(),
path: path2,
};
let table_ref2 = Some(TableReference::from("table2"));
let key3 = TableScopedPath {
table: table_ref2.clone(),
path: path3,
};
cache.put(&key1, value1);
cache.put(&key2, value2);
cache.put(&key3, value3);
cache.drop_table_entries(&table_ref1).unwrap();
assert!(!cache.contains_key(&key1));
assert!(!cache.contains_key(&key2));
assert!(cache.contains_key(&key3));
}
}