1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use bytes::Bytes;
use datafusion::parquet::errors::ParquetError;
use micromegas_tracing::prelude::*;
use object_store::ObjectStore;
use std::ops::Range;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use super::file_cache::FileCache;
/// Adds file content caching to object store reads.
///
/// This is an internal component used by `ParquetReader`, not a standalone `AsyncFileReader`.
/// It only provides `get_bytes` and `get_byte_ranges` methods - metadata handling remains
/// in the `ParquetReader` layer.
///
/// Uses a two-level caching strategy:
/// 1. Local `cached_data` - avoids global cache lookups within a single reader
/// 2. Global `FileCache` - shared across all readers, with thundering herd protection
pub struct CachingReader {
/// Object store for loading uncached files (shared, cloneable)
object_store: Arc<dyn ObjectStore>,
/// Path to the file in object store
path: object_store::path::Path,
filename: String,
file_size: u64,
file_cache: Arc<FileCache>,
/// Local cache of file data for this reader instance
cached_data: Option<Bytes>,
/// Whether the most recent read operation was served from cache
last_read_was_cache_hit: bool,
}
impl CachingReader {
pub fn new(
object_store: Arc<dyn ObjectStore>,
path: object_store::path::Path,
filename: String,
file_size: u64,
file_cache: Arc<FileCache>,
) -> Self {
Self {
object_store,
path,
filename,
file_size,
file_cache,
cached_data: None,
last_read_was_cache_hit: false,
}
}
/// Returns whether the most recent read operation was served from cache.
pub fn last_read_was_cache_hit(&self) -> bool {
self.last_read_was_cache_hit
}
/// Load file data, using cache with thundering herd protection.
/// Returns the data and sets `last_read_was_cache_hit` accordingly.
async fn load_file_data(&mut self) -> datafusion::parquet::errors::Result<Bytes> {
// Check local cache first (avoids global cache lookup)
if let Some(data) = &self.cached_data {
self.last_read_was_cache_hit = true;
return Ok(data.clone());
}
// Use get_or_load for thundering herd protection - concurrent requests
// for the same file will coalesce into a single object store fetch.
// Track whether the loader was called to determine cache hit/miss.
let loader_was_called = Arc::new(AtomicBool::new(false));
let loader_was_called_clone = Arc::clone(&loader_was_called);
let object_store = Arc::clone(&self.object_store);
let path = self.path.clone();
let filename = self.filename.clone();
let file_size = self.file_size;
let data = self
.file_cache
.get_or_load(&self.filename, self.file_size, || {
loader_was_called_clone.store(true, Ordering::SeqCst);
async move {
debug!("file_cache_load file={filename} file_size={file_size}");
let result = object_store.get(&path).await?;
result.bytes().await
}
})
.await
.map_err(|e| ParquetError::General(e.to_string()))?;
self.cached_data = Some(data.clone());
self.last_read_was_cache_hit = !loader_was_called.load(Ordering::SeqCst);
Ok(data)
}
pub async fn get_bytes(
&mut self,
range: Range<u64>,
) -> datafusion::parquet::errors::Result<Bytes> {
if self.file_cache.should_cache(self.file_size) {
let data = self.load_file_data().await?;
Ok(data.slice(range.start as usize..range.end as usize))
} else {
// Large file - read directly from object store (bypass cache)
self.last_read_was_cache_hit = false;
debug!(
"file_cache_skip file={} file_size={}",
self.filename, self.file_size
);
self.object_store
.get_range(&self.path, range)
.await
.map_err(|e| ParquetError::External(Box::new(e)))
}
}
pub async fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> datafusion::parquet::errors::Result<Vec<Bytes>> {
if self.file_cache.should_cache(self.file_size) {
let data = self.load_file_data().await?;
Ok(ranges
.into_iter()
.map(|r| data.slice(r.start as usize..r.end as usize))
.collect())
} else {
// Large file - use object_store's get_ranges for efficient multi-range fetch
self.last_read_was_cache_hit = false;
debug!(
"file_cache_skip file={} file_size={}",
self.filename, self.file_size
);
let results = self
.object_store
.get_ranges(&self.path, &ranges)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
Ok(results)
}
}
}