Skip to main content

scouter_dataframe/
caching_store.rs

1use async_trait::async_trait;
2use bytes::Bytes;
3use futures::stream::BoxStream;
4use mini_moka::sync::Cache;
5use object_store::path::Path;
6use object_store::{
7    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
8    PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
9};
10use std::fmt;
11use std::sync::Arc;
12use std::time::Duration;
13
14/// Cache key for range reads: (path, start, end).
15#[derive(Clone, Debug, Hash, Eq, PartialEq)]
16struct RangeCacheKey {
17    path: Arc<str>,
18    start: u64,
19    end: u64,
20}
21
22/// Maximum size of a single range read that will be cached (2 MB).
23/// Parquet footers are typically well under this; column data reads are larger
24/// and will pass through uncached.
25const MAX_CACHEABLE_BYTES: u64 = 2 * 1024 * 1024;
26
27/// An `ObjectStore` wrapper that caches `head()` and small `get_range()` responses.
28///
29/// After Z-ORDER compaction the Parquet files backing Delta tables are immutable:
30/// the same path always returns the same bytes. Caching the metadata calls that
31/// DataFusion issues on every query (HEAD for file size, then GET-range for the
32/// footer) eliminates redundant cloud round-trips.
33///
34/// All mutating and streaming methods delegate directly to the inner store.
35#[derive(Debug)]
36pub struct CachingStore<T: ObjectStore> {
37    inner: T,
38    /// path → ObjectMeta
39    head_cache: Cache<Arc<str>, ObjectMeta>,
40    /// (path, start, end) → Bytes
41    range_cache: Cache<RangeCacheKey, Bytes>,
42}
43
44impl<T: ObjectStore> CachingStore<T> {
45    /// Create a new caching wrapper.
46    ///
47    /// * `inner` – the concrete object store to wrap.
48    /// * `range_cache_max_bytes` – maximum total weight of the range cache
49    ///   (each entry is weighed by its byte length).
50    pub fn new(inner: T, range_cache_max_bytes: u64) -> Self {
51        let ttl = Duration::from_secs(3600); // 1 hour
52
53        let head_cache = Cache::builder()
54            .max_capacity(10_000)
55            .time_to_live(ttl)
56            .build();
57
58        let range_cache = Cache::builder()
59            .max_capacity(range_cache_max_bytes)
60            .weigher(|_key: &RangeCacheKey, value: &Bytes| -> u32 {
61                // Clamp to u32::MAX for the weigher contract.
62                value.len().min(u32::MAX as usize) as u32
63            })
64            .time_to_live(ttl)
65            .build();
66
67        Self {
68            inner,
69            head_cache,
70            range_cache,
71        }
72    }
73}
74
75impl<T: ObjectStore> fmt::Display for CachingStore<T> {
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        write!(f, "CachingStore({})", self.inner)
78    }
79}
80
81#[async_trait]
82impl<T: ObjectStore> ObjectStore for CachingStore<T> {
83    // ── Passthrough (mutating / streaming) ──────────────────────────────
84
85    async fn put_opts(
86        &self,
87        location: &Path,
88        payload: PutPayload,
89        opts: PutOptions,
90    ) -> Result<PutResult> {
91        self.inner.put_opts(location, payload, opts).await
92    }
93
94    async fn put_multipart_opts(
95        &self,
96        location: &Path,
97        opts: PutMultipartOptions,
98    ) -> Result<Box<dyn MultipartUpload>> {
99        self.inner.put_multipart_opts(location, opts).await
100    }
101
102    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
103        self.inner.get_opts(location, options).await
104    }
105
106    async fn delete(&self, location: &Path) -> Result<()> {
107        // Invalidate caches for the deleted path.
108        let key: Arc<str> = location.to_string().into();
109        self.head_cache.invalidate(&key);
110        self.inner.delete(location).await
111    }
112
113    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
114        self.inner.list(prefix)
115    }
116
117    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
118        self.inner.list_with_delimiter(prefix).await
119    }
120
121    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
122        self.inner.copy(from, to).await
123    }
124
125    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
126        self.inner.copy_if_not_exists(from, to).await
127    }
128
129    // ── Cached methods ──────────────────────────────────────────────────
130
131    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
132        let key: Arc<str> = location.to_string().into();
133
134        if let Some(meta) = self.head_cache.get(&key) {
135            return Ok(meta);
136        }
137
138        let meta = self.inner.head(location).await?;
139        self.head_cache.insert(key, meta.clone());
140        Ok(meta)
141    }
142
143    async fn get_range(&self, location: &Path, range: std::ops::Range<u64>) -> Result<Bytes> {
144        let len = range.end.saturating_sub(range.start);
145
146        // Only cache small reads (footer-sized). Large column data reads pass through.
147        if len > MAX_CACHEABLE_BYTES {
148            return self.inner.get_range(location, range).await;
149        }
150
151        let key = RangeCacheKey {
152            path: location.to_string().into(),
153            start: range.start,
154            end: range.end,
155        };
156
157        if let Some(bytes) = self.range_cache.get(&key) {
158            return Ok(bytes);
159        }
160
161        let bytes = self.inner.get_range(location, range).await?;
162        self.range_cache.insert(key, bytes.clone());
163        Ok(bytes)
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use object_store::memory::InMemory;
171    use object_store::PutPayload;
172
173    #[tokio::test]
174    async fn head_is_cached() {
175        let mem = InMemory::new();
176        let path = Path::from("test/file.parquet");
177        mem.put(&path, PutPayload::from_static(b"hello"))
178            .await
179            .unwrap();
180
181        let store = CachingStore::new(mem, 64 * 1024 * 1024);
182
183        // First call populates the cache.
184        let meta1 = store.head(&path).await.unwrap();
185        // Second call should return cached value.
186        let meta2 = store.head(&path).await.unwrap();
187
188        assert_eq!(meta1.size, meta2.size);
189        assert_eq!(meta1.location, meta2.location);
190    }
191
192    #[tokio::test]
193    async fn get_range_is_cached() {
194        let mem = InMemory::new();
195        let path = Path::from("test/file.parquet");
196        let data = b"0123456789abcdef";
197        mem.put(&path, PutPayload::from_static(data)).await.unwrap();
198
199        let store = CachingStore::new(mem, 64 * 1024 * 1024);
200
201        let bytes1 = store.get_range(&path, 4..10).await.unwrap();
202        let bytes2 = store.get_range(&path, 4..10).await.unwrap();
203
204        assert_eq!(bytes1, bytes2);
205        assert_eq!(&bytes1[..], b"456789");
206    }
207
208    #[tokio::test]
209    async fn large_range_not_cached() {
210        let mem = InMemory::new();
211        let path = Path::from("test/big.parquet");
212        let data = vec![0u8; 3 * 1024 * 1024]; // 3 MB — exceeds MAX_CACHEABLE_BYTES
213        mem.put(&path, PutPayload::from(data)).await.unwrap();
214
215        let store = CachingStore::new(mem, 64 * 1024 * 1024);
216
217        // Should still work, just not be cached.
218        let bytes = store.get_range(&path, 0..3 * 1024 * 1024).await.unwrap();
219        assert_eq!(bytes.len(), 3 * 1024 * 1024);
220    }
221}