1use std::{
2 fmt::{Display, Formatter},
3 fs,
4 ops::{Range, RangeInclusive},
5 path::PathBuf,
6 sync::Arc,
7};
8
9use async_stream::stream;
10use async_trait::async_trait;
11use bytes::Bytes;
12use futures::{Stream, stream::BoxStream};
13use object_store::{
14 CopyOptions, Error, GetOptions, GetRange, GetResult, GetResultPayload, ListResult,
15 MultipartUpload, ObjectMeta, ObjectStore, ObjectStoreExt, PutMultipartOptions, PutOptions,
16 PutPayload, PutResult, Result, path::Path,
17};
18use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
19
20const CACHE_BLOCK_SIZE: u64 = 1024 * 1024 * 4; #[derive(Debug, Clone)]
24pub struct ByteCache {
25 inner: Arc<dyn ObjectStore>,
26 cache_dir: PathBuf,
27}
28
29impl ByteCache {
30 pub fn new(inner: Arc<dyn ObjectStore>, cache_dir: PathBuf) -> Self {
33 if !cache_dir.exists() {
34 fs::create_dir_all(&cache_dir).expect("Failed to create cache directory");
35 }
36 Self { inner, cache_dir }
37 }
38
39 fn get_cache_dir_for_path(&self, path: &Path) -> PathBuf {
41 let path_str = path.as_ref().replace("/", "_");
42 self.cache_dir.join(path_str)
43 }
44
45 fn get_chunk_path(&self, path: &Path, chunk_index: u64) -> PathBuf {
47 let cache_dir = self.get_cache_dir_for_path(path);
48 cache_dir.join(format!("chunk_{chunk_index}.bin"))
49 }
50
51 fn get_temp_chunk_path(&self, path: &Path, chunk_index: u64) -> PathBuf {
53 let cache_dir = self.get_cache_dir_for_path(path);
54 let now = std::time::SystemTime::now()
56 .duration_since(std::time::UNIX_EPOCH)
57 .unwrap_or_default()
58 .as_nanos();
59 let thread_id = std::thread::current().id();
60 cache_dir.join(format!(
61 "in-progress-chunk_{chunk_index}_{now}_{thread_id:?}.bin"
62 ))
63 }
64
65 fn chunks_for_range(&self, range: &Range<u64>) -> RangeInclusive<u64> {
67 let start_chunk = range.start / CACHE_BLOCK_SIZE;
68 let end_chunk = (range.end - 1) / CACHE_BLOCK_SIZE; start_chunk..=end_chunk
70 }
71
72 fn is_chunk_ready(&self, chunk_path: &std::path::Path) -> bool {
74 chunk_path.exists()
75 }
76
77 async fn read_from_cached_chunk(
79 &self,
80 chunk_path: PathBuf,
81 offset: u64,
82 len: usize,
83 ) -> Result<Bytes> {
84 let mut file = tokio::fs::File::open(&chunk_path)
85 .await
86 .map_err(|e| Error::Generic {
87 store: "ByteCache",
88 source: Box::new(e),
89 })?;
90
91 let mut buffer = vec![0u8; len];
92 file.seek(tokio::io::SeekFrom::Start(offset))
93 .await
94 .map_err(|e| Error::Generic {
95 store: "ByteCache",
96 source: Box::new(e),
97 })?;
98
99 file.read_exact(&mut buffer)
100 .await
101 .map_err(|e| Error::Generic {
102 store: "ByteCache",
103 source: Box::new(e),
104 })?;
105
106 Ok(Bytes::from(buffer))
107 }
108
109 async fn save_chunk(&self, path: &Path, chunk_index: u64, data: Bytes) -> Result<()> {
111 let cache_dir = self.get_cache_dir_for_path(path);
112 if !cache_dir.exists() {
113 fs::create_dir_all(&cache_dir).map_err(|e| Error::Generic {
114 store: "ByteCache",
115 source: Box::new(e),
116 })?;
117 }
118
119 let temp_chunk_path = self.get_temp_chunk_path(path, chunk_index);
121 let final_chunk_path = self.get_chunk_path(path, chunk_index);
122
123 let mut file = tokio::fs::File::create(&temp_chunk_path)
124 .await
125 .map_err(|e| Error::Generic {
126 store: "ByteCache",
127 source: Box::new(e),
128 })?;
129
130 file.write_all(&data).await.map_err(|e| Error::Generic {
132 store: "ByteCache",
133 source: Box::new(e),
134 })?;
135
136 file.sync_all().await.map_err(|e| Error::Generic {
138 store: "ByteCache",
139 source: Box::new(e),
140 })?;
141
142 drop(file);
144
145 match tokio::fs::rename(&temp_chunk_path, &final_chunk_path).await {
148 Ok(_) => Ok(()),
149 Err(e) => {
150 let _ = tokio::fs::remove_file(&temp_chunk_path).await;
151 if final_chunk_path.exists() {
152 Ok(())
153 } else {
154 Err(Error::Generic {
155 store: "ByteCache",
156 source: Box::new(e),
157 })
158 }
159 }
160 }
161 }
162
163 fn get_range_from_cache_stream(
165 &self,
166 location: &Path,
167 range: &Range<u64>,
168 ) -> impl Stream<Item = Result<Bytes>> + Send + 'static {
169 let this = self.clone();
170 let location = location.clone();
171 let range = range.clone();
172 stream! {
173 let chunks_needed = this.chunks_for_range(&range);
174 for chunk_idx in chunks_needed {
175 let chunk_path = this.get_chunk_path(&location, chunk_idx);
176 let chunk_start = chunk_idx * CACHE_BLOCK_SIZE;
177 let chunk_end = chunk_start + CACHE_BLOCK_SIZE;
178
179 let overlap_start = std::cmp::max(chunk_start, range.start);
180 let overlap_end = std::cmp::min(chunk_end, range.end);
181
182 if overlap_start < overlap_end {
183 let offset_in_chunk = overlap_start - chunk_start;
184 let length = overlap_end - overlap_start;
185
186 yield this
187 .read_from_cached_chunk(chunk_path, offset_in_chunk, length as usize)
188 .await;
189 }
190 }
191 }
192 }
193}
194
195impl Display for ByteCache {
196 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
197 write!(f, "ByteCache(cache_dir: {:?})", self.cache_dir)
198 }
199}
200
201#[async_trait]
202impl ObjectStore for ByteCache {
203 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
204 self.inner.list(prefix)
205 }
206
207 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
208 self.inner.list_with_delimiter(prefix).await
209 }
210
211 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
212 let meta = self.inner.head(location).await?;
214 let file_size = meta.size;
215
216 if options.head {
218 return self.inner.get_opts(location, options).await;
219 }
220
221 let range = match &options.range {
223 Some(GetRange::Bounded(range)) => range.clone(),
224 Some(GetRange::Suffix(suffix)) => (file_size.saturating_sub(*suffix))..file_size,
225 Some(GetRange::Offset(offset)) => *offset..file_size,
226 None => 0..file_size,
227 };
228
229 let chunks_needed = self.chunks_for_range(&range);
231
232 let mut missing_chunks = Vec::new();
234 for chunk_idx in chunks_needed {
235 let chunk_path = self.get_chunk_path(location, chunk_idx);
236 if !self.is_chunk_ready(&chunk_path) {
237 missing_chunks.push(chunk_idx);
238 }
239 }
240
241 for chunk_idx in missing_chunks {
243 let chunk_start = chunk_idx * CACHE_BLOCK_SIZE;
244 let chunk_end = std::cmp::min(chunk_start + CACHE_BLOCK_SIZE, file_size);
245
246 let chunk_range = GetRange::Bounded(chunk_start..chunk_end);
247 let chunk_options = GetOptions {
248 range: Some(chunk_range),
249 ..options.clone()
250 };
251
252 let chunk_result = self.inner.get_opts(location, chunk_options).await?;
253 let chunk_data = chunk_result.bytes().await?;
254
255 self.save_chunk(location, chunk_idx, chunk_data).await?;
257 }
258
259 Ok(GetResult {
261 payload: GetResultPayload::Stream(Box::pin(
262 self.get_range_from_cache_stream(location, &range),
263 )),
264 meta,
265 range,
266 attributes: Default::default(),
267 })
268 }
269
270 async fn put_opts(
271 &self,
272 _location: &Path,
273 _payload: PutPayload,
274 _opts: PutOptions,
275 ) -> Result<PutResult> {
276 unreachable!("ByteCache does not support put")
277 }
278
279 async fn put_multipart_opts(
280 &self,
281 _location: &Path,
282 _opts: PutMultipartOptions,
283 ) -> Result<Box<dyn MultipartUpload>> {
284 unreachable!("ByteCache does not support multipart upload")
285 }
286
287 fn delete_stream(
288 &self,
289 _locations: BoxStream<'static, Result<Path>>,
290 ) -> BoxStream<'static, Result<Path>> {
291 unreachable!("ByteCache does not support delete")
292 }
293
294 async fn copy_opts(&self, _from: &Path, _to: &Path, _options: CopyOptions) -> Result<()> {
295 unreachable!("ByteCache does not support copy")
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use bytes::Bytes;
303 use liquid_cache_common::mock_store::MockStore;
304 use object_store::memory::InMemory;
305 use std::ops::Range;
306 use tempfile::tempdir;
307
308 async fn create_test_file(store: &InMemory, path: &str, size: u64) -> Result<()> {
310 let data = vec![0u8; size as usize];
311 let data: Vec<u8> = data
314 .iter()
315 .enumerate()
316 .map(|(i, _)| (i % 256) as u8)
317 .collect();
318
319 let path = Path::from(path);
320 store.put(&path, Bytes::from(data).into()).await?;
321 Ok(())
322 }
323
324 async fn verify_range(store: &dyn ObjectStore, path: &str, range: Range<u64>) -> Result<()> {
326 let path = Path::from(path);
327 let result = store.get_range(&path, range.clone()).await?;
328
329 for (i, byte) in result.iter().enumerate() {
331 let expected = ((range.start + i as u64) % 256) as u8;
332 assert_eq!(*byte, expected, "Mismatch at position {i}");
333 }
334
335 Ok(())
336 }
337
338 #[tokio::test]
340 async fn test_small_file() -> Result<()> {
341 let inner = Arc::new(InMemory::new());
342 let temp_dir = tempdir().unwrap();
343 let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
344
345 let file_path = "small_file.bin";
347 create_test_file(&inner, file_path, 10 * 1024).await?;
348
349 verify_range(&cache, file_path, 0..10 * 1024).await?;
351
352 let cache_dir = cache.get_cache_dir_for_path(&Path::from(file_path));
354 assert!(cache_dir.exists(), "Cache directory should exist");
355
356 let chunk_path = cache.get_chunk_path(&Path::from(file_path), 0);
357 assert!(
358 cache.is_chunk_ready(&chunk_path),
359 "Chunk file should be ready"
360 );
361
362 Ok(())
363 }
364
365 #[tokio::test]
367 async fn test_large_file() -> Result<()> {
368 let inner = Arc::new(InMemory::new());
369 let temp_dir = tempdir().unwrap();
370 let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
371
372 let file_path = "large_file.bin";
374 let file_size = CACHE_BLOCK_SIZE * 2 + 1024 * 1024; create_test_file(&inner, file_path, file_size).await?;
376
377 verify_range(&cache, file_path, 0..file_size).await?;
379
380 for chunk_idx in 0..=2 {
382 let chunk_path = cache.get_chunk_path(&Path::from(file_path), chunk_idx);
383 assert!(
384 cache.is_chunk_ready(&chunk_path),
385 "Chunk {chunk_idx} should be ready"
386 );
387 }
388
389 Ok(())
390 }
391
392 #[tokio::test]
394 async fn test_range_within_chunk() -> Result<()> {
395 let inner = Arc::new(InMemory::new());
396 let temp_dir = tempdir().unwrap();
397 let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
398
399 let file_path = "range_test.bin";
401 let file_size = CACHE_BLOCK_SIZE * 3; create_test_file(&inner, file_path, file_size).await?;
403
404 let start = CACHE_BLOCK_SIZE + 1024;
406 let end = CACHE_BLOCK_SIZE + 2048;
407 verify_range(&cache, file_path, start..end).await?;
408
409 let chunk1_path = cache.get_chunk_path(&Path::from(file_path), 1);
411 assert!(
412 cache.is_chunk_ready(&chunk1_path),
413 "Chunk 1 should be ready"
414 );
415
416 let chunk0_path = cache.get_chunk_path(&Path::from(file_path), 0);
418 let chunk2_path = cache.get_chunk_path(&Path::from(file_path), 2);
419 assert!(
420 !cache.is_chunk_ready(&chunk0_path),
421 "Chunk 0 should not be ready yet"
422 );
423 assert!(
424 !cache.is_chunk_ready(&chunk2_path),
425 "Chunk 2 should not be ready yet"
426 );
427
428 Ok(())
429 }
430
431 #[tokio::test]
433 async fn test_range_across_chunks() -> Result<()> {
434 let inner = Arc::new(InMemory::new());
435 let temp_dir = tempdir().unwrap();
436 let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
437
438 let file_path = "multi_chunk_range.bin";
440 let file_size = CACHE_BLOCK_SIZE * 3; create_test_file(&inner, file_path, file_size).await?;
442
443 let start = CACHE_BLOCK_SIZE - 1024;
445 let end = CACHE_BLOCK_SIZE * 2 + 1024;
446 verify_range(&cache, file_path, start..end).await?;
447
448 let chunk0_path = cache.get_chunk_path(&Path::from(file_path), 0);
450 let chunk1_path = cache.get_chunk_path(&Path::from(file_path), 1);
451 let chunk2_path = cache.get_chunk_path(&Path::from(file_path), 2);
452 assert!(
453 cache.is_chunk_ready(&chunk0_path),
454 "Chunk 0 should be ready"
455 );
456 assert!(
457 cache.is_chunk_ready(&chunk1_path),
458 "Chunk 1 should be ready"
459 );
460 assert!(
461 cache.is_chunk_ready(&chunk2_path),
462 "Chunk 2 should be ready"
463 );
464
465 Ok(())
466 }
467
468 #[tokio::test]
470 async fn test_cache_hit() -> Result<()> {
471 let inner = Arc::new(InMemory::new());
472 let temp_dir = tempdir().unwrap();
473 let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
474
475 let file_path = "cache_hit.bin";
477 let file_size = CACHE_BLOCK_SIZE + 1024; create_test_file(&inner, file_path, file_size).await?;
479
480 verify_range(&cache, file_path, 0..file_size).await?;
482
483 let modified_data = vec![255u8; file_size as usize];
485 let path = Path::from(file_path);
486 inner.put(&path, Bytes::from(modified_data).into()).await?;
487
488 verify_range(&cache, file_path, 0..file_size).await?;
490
491 Ok(())
492 }
493
494 #[tokio::test]
496 async fn test_suffix_range() -> Result<()> {
497 let inner = Arc::new(InMemory::new());
498 let temp_dir = tempdir().unwrap();
499 let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
500
501 let file_path = "suffix_range.bin";
503 let file_size = CACHE_BLOCK_SIZE * 2; create_test_file(&inner, file_path, file_size).await?;
505
506 let path = Path::from(file_path);
508 let options = GetOptions {
509 range: Some(GetRange::Suffix(1024 * 1024)),
510 ..Default::default()
511 };
512
513 let result = cache.get_opts(&path, options).await?;
514 let data = result.bytes().await?;
515
516 assert_eq!(data.len(), 1024 * 1024);
518
519 let start = file_size - 1024 * 1024;
521 for (i, byte) in data.iter().enumerate() {
522 let expected = ((start + i as u64) % 256) as u8;
523 assert_eq!(*byte, expected, "Mismatch at position {i}");
524 }
525
526 Ok(())
527 }
528
529 #[tokio::test]
530 async fn test_persistent_cache() -> Result<()> {
531 let inner = Arc::new(InMemory::new());
533 let temp_dir = tempdir().unwrap();
534 let cache_dir_path = temp_dir.path().to_path_buf();
535
536 let file_path = "persistent_test.bin";
538 let file_size = CACHE_BLOCK_SIZE + 1024; create_test_file(&inner, file_path, file_size).await?;
540
541 {
543 let first_cache = ByteCache::new(inner.clone(), cache_dir_path.clone());
544 verify_range(&first_cache, file_path, 0..file_size).await?;
545
546 let chunk_path = first_cache.get_chunk_path(&Path::from(file_path), 0);
548 assert!(
549 first_cache.is_chunk_ready(&chunk_path),
550 "First chunk should be cached"
551 );
552 }
553
554 let modified_data = vec![255u8; file_size as usize];
556 let path = Path::from(file_path);
557 inner.put(&path, Bytes::from(modified_data).into()).await?;
558
559 let second_cache = ByteCache::new(inner.clone(), cache_dir_path);
561
562 verify_range(&second_cache, file_path, 0..file_size).await?;
564
565 let mid_range = file_size / 2;
567 verify_range(&second_cache, file_path, mid_range..mid_range + 1024).await?;
568
569 Ok(())
570 }
571
572 #[tokio::test]
573 async fn test_object_store_metrics() -> Result<()> {
574 let inner = Arc::new(MockStore::new_with_files(
575 1,
576 (CACHE_BLOCK_SIZE * 3) as usize,
577 ));
578 let temp_dir = tempdir().unwrap();
579 let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
580
581 let path = Path::from("0.parquet");
582 let start = CACHE_BLOCK_SIZE / 2;
583 let end = CACHE_BLOCK_SIZE + CACHE_BLOCK_SIZE / 2;
584
585 verify_range(&cache, path.as_ref(), start..end).await?;
586
587 let ranges = inner.get_access_ranges(&path).unwrap();
588 assert_eq!(ranges.len(), 2);
589 assert_eq!(ranges[0], 0..CACHE_BLOCK_SIZE);
590 assert_eq!(ranges[1], CACHE_BLOCK_SIZE..CACHE_BLOCK_SIZE * 2);
591
592 verify_range(&cache, path.as_ref(), start + 1024..end - 1024).await?;
594 let ranges_after = inner.get_access_ranges(&path).unwrap();
595 assert_eq!(ranges_after.len(), 2);
596
597 Ok(())
598 }
599}