scouter_dataframe/
caching_store.rs1use async_trait::async_trait;
2use bytes::Bytes;
3use futures::stream::BoxStream;
4use mini_moka::sync::Cache;
5use object_store::path::Path;
6use object_store::{
7 GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
8 PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
9};
10use std::fmt;
11use std::sync::Arc;
12use std::time::Duration;
13
14#[derive(Clone, Debug, Hash, Eq, PartialEq)]
16struct RangeCacheKey {
17 path: Arc<str>,
18 start: u64,
19 end: u64,
20}
21
22const MAX_CACHEABLE_BYTES: u64 = 2 * 1024 * 1024;
26
27#[derive(Debug)]
36pub struct CachingStore<T: ObjectStore> {
37 inner: T,
38 head_cache: Cache<Arc<str>, ObjectMeta>,
40 range_cache: Cache<RangeCacheKey, Bytes>,
42}
43
44impl<T: ObjectStore> CachingStore<T> {
45 pub fn new(inner: T, range_cache_max_bytes: u64) -> Self {
51 let ttl = Duration::from_secs(3600); let head_cache = Cache::builder()
54 .max_capacity(10_000)
55 .time_to_live(ttl)
56 .build();
57
58 let range_cache = Cache::builder()
59 .max_capacity(range_cache_max_bytes)
60 .weigher(|_key: &RangeCacheKey, value: &Bytes| -> u32 {
61 value.len().min(u32::MAX as usize) as u32
63 })
64 .time_to_live(ttl)
65 .build();
66
67 Self {
68 inner,
69 head_cache,
70 range_cache,
71 }
72 }
73}
74
75impl<T: ObjectStore> fmt::Display for CachingStore<T> {
76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77 write!(f, "CachingStore({})", self.inner)
78 }
79}
80
81#[async_trait]
82impl<T: ObjectStore> ObjectStore for CachingStore<T> {
83 async fn put_opts(
86 &self,
87 location: &Path,
88 payload: PutPayload,
89 opts: PutOptions,
90 ) -> Result<PutResult> {
91 self.inner.put_opts(location, payload, opts).await
92 }
93
94 async fn put_multipart_opts(
95 &self,
96 location: &Path,
97 opts: PutMultipartOptions,
98 ) -> Result<Box<dyn MultipartUpload>> {
99 self.inner.put_multipart_opts(location, opts).await
100 }
101
102 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
103 self.inner.get_opts(location, options).await
104 }
105
106 async fn delete(&self, location: &Path) -> Result<()> {
107 let key: Arc<str> = location.to_string().into();
109 self.head_cache.invalidate(&key);
110 self.inner.delete(location).await
111 }
112
113 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
114 self.inner.list(prefix)
115 }
116
117 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
118 self.inner.list_with_delimiter(prefix).await
119 }
120
121 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
122 self.inner.copy(from, to).await
123 }
124
125 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
126 self.inner.copy_if_not_exists(from, to).await
127 }
128
129 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
132 let key: Arc<str> = location.to_string().into();
133
134 if let Some(meta) = self.head_cache.get(&key) {
135 return Ok(meta);
136 }
137
138 let meta = self.inner.head(location).await?;
139 self.head_cache.insert(key, meta.clone());
140 Ok(meta)
141 }
142
143 async fn get_range(&self, location: &Path, range: std::ops::Range<u64>) -> Result<Bytes> {
144 let len = range.end.saturating_sub(range.start);
145
146 if len > MAX_CACHEABLE_BYTES {
148 return self.inner.get_range(location, range).await;
149 }
150
151 let key = RangeCacheKey {
152 path: location.to_string().into(),
153 start: range.start,
154 end: range.end,
155 };
156
157 if let Some(bytes) = self.range_cache.get(&key) {
158 return Ok(bytes);
159 }
160
161 let bytes = self.inner.get_range(location, range).await?;
162 self.range_cache.insert(key, bytes.clone());
163 Ok(bytes)
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use object_store::memory::InMemory;
171 use object_store::PutPayload;
172
173 #[tokio::test]
174 async fn head_is_cached() {
175 let mem = InMemory::new();
176 let path = Path::from("test/file.parquet");
177 mem.put(&path, PutPayload::from_static(b"hello"))
178 .await
179 .unwrap();
180
181 let store = CachingStore::new(mem, 64 * 1024 * 1024);
182
183 let meta1 = store.head(&path).await.unwrap();
185 let meta2 = store.head(&path).await.unwrap();
187
188 assert_eq!(meta1.size, meta2.size);
189 assert_eq!(meta1.location, meta2.location);
190 }
191
192 #[tokio::test]
193 async fn get_range_is_cached() {
194 let mem = InMemory::new();
195 let path = Path::from("test/file.parquet");
196 let data = b"0123456789abcdef";
197 mem.put(&path, PutPayload::from_static(data)).await.unwrap();
198
199 let store = CachingStore::new(mem, 64 * 1024 * 1024);
200
201 let bytes1 = store.get_range(&path, 4..10).await.unwrap();
202 let bytes2 = store.get_range(&path, 4..10).await.unwrap();
203
204 assert_eq!(bytes1, bytes2);
205 assert_eq!(&bytes1[..], b"456789");
206 }
207
208 #[tokio::test]
209 async fn large_range_not_cached() {
210 let mem = InMemory::new();
211 let path = Path::from("test/big.parquet");
212 let data = vec![0u8; 3 * 1024 * 1024]; mem.put(&path, PutPayload::from(data)).await.unwrap();
214
215 let store = CachingStore::new(mem, 64 * 1024 * 1024);
216
217 let bytes = store.get_range(&path, 0..3 * 1024 * 1024).await.unwrap();
219 assert_eq!(bytes.len(), 3 * 1024 * 1024);
220 }
221}