1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use anyhow::bail;
use bytes::Bytes;
use micromegas_tracing::prelude::*;
use moka::future::Cache;
use moka::notification::RemovalCause;
use std::future::Future;
use std::sync::Arc;
/// Default cache size (200 MB)
const DEFAULT_CACHE_SIZE_BYTES: u64 = 200 * 1024 * 1024;
/// Default max file size to cache (10 MB)
const DEFAULT_MAX_FILE_SIZE_BYTES: u64 = 10 * 1024 * 1024;
/// Cache entry storing file data and metadata for weight calculation
#[derive(Clone)]
struct CacheEntry {
data: Bytes,
file_size: u32,
/// Timestamp when the entry was inserted (in ticks from now())
inserted_at: i64,
}
/// Global LRU cache for parquet file contents, shared across all readers and queries.
///
/// Memory budget is based on file size. Uses moka's `try_get_with` to prevent
/// thundering herd - concurrent requests for the same uncached file will coalesce
/// into a single load operation.
pub struct FileCache {
cache: Cache<String, CacheEntry>,
max_file_size: u64,
}
impl Default for FileCache {
fn default() -> Self {
Self::new(DEFAULT_CACHE_SIZE_BYTES, DEFAULT_MAX_FILE_SIZE_BYTES)
}
}
impl FileCache {
/// Creates a new file cache with the specified memory budget and max file size.
pub fn new(max_capacity_bytes: u64, max_file_size_bytes: u64) -> Self {
let cache = Cache::builder()
.max_capacity(max_capacity_bytes)
.weigher(|_key: &String, entry: &CacheEntry| -> u32 { entry.file_size })
.eviction_listener(
|_key: Arc<String>, entry: CacheEntry, cause: RemovalCause| {
if cause == RemovalCause::Size {
// Track eviction delay: time between insertion and eviction due to size pressure
let eviction_delay = now() - entry.inserted_at;
imetric!("file_cache_eviction_delay", "ticks", eviction_delay as u64);
}
},
)
.build();
Self {
cache,
max_file_size: max_file_size_bytes,
}
}
/// Check if a file should be cached based on its size
pub fn should_cache(&self, file_size: u64) -> bool {
file_size <= self.max_file_size
}
/// Gets file contents, loading from the provided async function on cache miss.
///
/// Uses moka's `try_get_with` to coalesce concurrent requests - if multiple
/// callers request the same uncached file simultaneously, only one will
/// execute the loader while others wait for the result.
///
/// Returns an error if file_size >= 4GB (moka weigher uses u32).
pub async fn get_or_load<F, Fut, E>(
&self,
file_path: &str,
file_size: u64,
loader: F,
) -> anyhow::Result<Bytes>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<Bytes, E>>,
E: Send + Sync + std::error::Error + 'static,
{
if file_size > u32::MAX as u64 {
bail!(
"file too large to cache: {file_size} bytes (max {})",
u32::MAX
);
}
let file_size_u32 = file_size as u32;
// Note: entry_count may be stale under concurrent loads of different files (approximate metric)
let entry_count = self.cache.entry_count();
let result = self
.cache
.try_get_with(file_path.to_string(), async {
let data = loader().await.map_err(|e| anyhow::anyhow!(e))?;
imetric!("file_cache_entry_count", "count", entry_count + 1);
Ok::<_, anyhow::Error>(CacheEntry {
data,
file_size: file_size_u32,
inserted_at: now(),
})
})
.await
.map_err(|e: Arc<anyhow::Error>| anyhow::anyhow!("{e}"))?;
Ok(result.data.clone())
}
/// Returns cache statistics (entry_count, weighted_size_bytes).
pub fn stats(&self) -> (u64, u64) {
(self.cache.entry_count(), self.cache.weighted_size())
}
/// Runs pending cache maintenance tasks.
///
/// This should be called to ensure cache statistics are up-to-date,
/// particularly useful in test scenarios.
pub async fn run_pending_tasks(&self) {
self.cache.run_pending_tasks().await;
}
}
impl std::fmt::Debug for FileCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let (entries, size) = self.stats();
f.debug_struct("FileCache")
.field("entries", &entries)
.field("weighted_size_bytes", &size)
.field("max_file_size", &self.max_file_size)
.finish()
}
}