1use std::{
2 fmt::{Display, Formatter},
3 fs,
4 ops::{Range, RangeInclusive},
5 path::PathBuf,
6 sync::Arc,
7};
8
9use async_stream::stream;
10use async_trait::async_trait;
11use bytes::Bytes;
12use futures::{Stream, stream::BoxStream};
13use object_store::{
14 Error, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
15 ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
16 path::Path,
17};
18use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
19
20const CACHE_BLOCK_SIZE: u64 = 1024 * 1024 * 4; #[derive(Debug, Clone)]
24pub struct ByteCache {
25 inner: Arc<dyn ObjectStore>,
26 cache_dir: PathBuf,
27}
28
29impl ByteCache {
30 pub fn new(inner: Arc<dyn ObjectStore>, cache_dir: PathBuf) -> Self {
33 if !cache_dir.exists() {
34 fs::create_dir_all(&cache_dir).expect("Failed to create cache directory");
35 }
36 Self { inner, cache_dir }
37 }
38
39 fn get_cache_dir_for_path(&self, path: &Path) -> PathBuf {
41 let path_str = path.as_ref().replace("/", "_");
42 self.cache_dir.join(path_str)
43 }
44
45 fn get_chunk_path(&self, path: &Path, chunk_index: u64) -> PathBuf {
47 let cache_dir = self.get_cache_dir_for_path(path);
48 cache_dir.join(format!("chunk_{chunk_index}.bin"))
49 }
50
51 fn get_temp_chunk_path(&self, path: &Path, chunk_index: u64) -> PathBuf {
53 let cache_dir = self.get_cache_dir_for_path(path);
54 let now = std::time::SystemTime::now()
56 .duration_since(std::time::UNIX_EPOCH)
57 .unwrap_or_default()
58 .as_nanos();
59 let thread_id = std::thread::current().id();
60 cache_dir.join(format!(
61 "in-progress-chunk_{chunk_index}_{now}_{thread_id:?}.bin"
62 ))
63 }
64
65 fn chunks_for_range(&self, range: &Range<u64>) -> RangeInclusive<u64> {
67 let start_chunk = range.start / CACHE_BLOCK_SIZE;
68 let end_chunk = (range.end - 1) / CACHE_BLOCK_SIZE; start_chunk..=end_chunk
70 }
71
72 fn is_chunk_ready(&self, chunk_path: &std::path::Path) -> bool {
74 chunk_path.exists()
75 }
76
77 async fn read_from_cached_chunk(
79 &self,
80 chunk_path: PathBuf,
81 offset: u64,
82 len: usize,
83 ) -> Result<Bytes> {
84 let mut file = tokio::fs::File::open(&chunk_path)
85 .await
86 .map_err(|e| Error::Generic {
87 store: "ByteCache",
88 source: Box::new(e),
89 })?;
90
91 let mut buffer = vec![0u8; len];
92 file.seek(tokio::io::SeekFrom::Start(offset))
93 .await
94 .map_err(|e| Error::Generic {
95 store: "ByteCache",
96 source: Box::new(e),
97 })?;
98
99 file.read_exact(&mut buffer)
100 .await
101 .map_err(|e| Error::Generic {
102 store: "ByteCache",
103 source: Box::new(e),
104 })?;
105
106 Ok(Bytes::from(buffer))
107 }
108
109 async fn save_chunk(&self, path: &Path, chunk_index: u64, data: Bytes) -> Result<()> {
111 let cache_dir = self.get_cache_dir_for_path(path);
112 if !cache_dir.exists() {
113 fs::create_dir_all(&cache_dir).map_err(|e| Error::Generic {
114 store: "ByteCache",
115 source: Box::new(e),
116 })?;
117 }
118
119 let temp_chunk_path = self.get_temp_chunk_path(path, chunk_index);
121 let final_chunk_path = self.get_chunk_path(path, chunk_index);
122
123 let mut file = tokio::fs::File::create(&temp_chunk_path)
124 .await
125 .map_err(|e| Error::Generic {
126 store: "ByteCache",
127 source: Box::new(e),
128 })?;
129
130 file.write_all(&data).await.map_err(|e| Error::Generic {
132 store: "ByteCache",
133 source: Box::new(e),
134 })?;
135
136 file.sync_all().await.map_err(|e| Error::Generic {
138 store: "ByteCache",
139 source: Box::new(e),
140 })?;
141
142 drop(file);
144
145 match tokio::fs::rename(&temp_chunk_path, &final_chunk_path).await {
148 Ok(_) => Ok(()),
149 Err(e) => {
150 let _ = tokio::fs::remove_file(&temp_chunk_path).await;
151 if final_chunk_path.exists() {
152 Ok(())
153 } else {
154 Err(Error::Generic {
155 store: "ByteCache",
156 source: Box::new(e),
157 })
158 }
159 }
160 }
161 }
162
163 fn get_range_from_cache_stream(
165 &self,
166 location: &Path,
167 range: &Range<u64>,
168 ) -> impl Stream<Item = Result<Bytes>> + Send + 'static {
169 let this = self.clone();
170 let location = location.clone();
171 let range = range.clone();
172 stream! {
173 let chunks_needed = this.chunks_for_range(&range);
174 for chunk_idx in chunks_needed {
175 let chunk_path = this.get_chunk_path(&location, chunk_idx);
176 let chunk_start = chunk_idx * CACHE_BLOCK_SIZE;
177 let chunk_end = chunk_start + CACHE_BLOCK_SIZE;
178
179 let overlap_start = std::cmp::max(chunk_start, range.start);
180 let overlap_end = std::cmp::min(chunk_end, range.end);
181
182 if overlap_start < overlap_end {
183 let offset_in_chunk = overlap_start - chunk_start;
184 let length = overlap_end - overlap_start;
185
186 yield this
187 .read_from_cached_chunk(chunk_path, offset_in_chunk, length as usize)
188 .await;
189 }
190 }
191 }
192 }
193}
194
195impl Display for ByteCache {
196 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
197 write!(f, "ByteCache(cache_dir: {:?})", self.cache_dir)
198 }
199}
200
201#[async_trait]
202impl ObjectStore for ByteCache {
203 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
204 self.inner.list(prefix)
205 }
206
207 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
208 self.inner.list_with_delimiter(prefix).await
209 }
210
211 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
212 let meta = self.inner.head(location).await?;
214 let file_size = meta.size;
215
216 if options.head {
218 return self.inner.get_opts(location, options).await;
219 }
220
221 let range = match &options.range {
223 Some(GetRange::Bounded(range)) => range.clone(),
224 Some(GetRange::Suffix(suffix)) => (file_size.saturating_sub(*suffix))..file_size,
225 Some(GetRange::Offset(offset)) => *offset..file_size,
226 None => 0..file_size,
227 };
228
229 let chunks_needed = self.chunks_for_range(&range);
231
232 let mut missing_chunks = Vec::new();
234 for chunk_idx in chunks_needed {
235 let chunk_path = self.get_chunk_path(location, chunk_idx);
236 if !self.is_chunk_ready(&chunk_path) {
237 missing_chunks.push(chunk_idx);
238 }
239 }
240
241 for chunk_idx in missing_chunks {
243 let chunk_start = chunk_idx * CACHE_BLOCK_SIZE;
244 let chunk_end = std::cmp::min(chunk_start + CACHE_BLOCK_SIZE, file_size);
245
246 let chunk_range = GetRange::Bounded(chunk_start..chunk_end);
247 let chunk_options = GetOptions {
248 range: Some(chunk_range),
249 ..options.clone()
250 };
251
252 let chunk_result = self.inner.get_opts(location, chunk_options).await?;
253 let chunk_data = chunk_result.bytes().await?;
254
255 self.save_chunk(location, chunk_idx, chunk_data).await?;
257 }
258
259 Ok(GetResult {
261 payload: GetResultPayload::Stream(Box::pin(
262 self.get_range_from_cache_stream(location, &range),
263 )),
264 meta,
265 range,
266 attributes: Default::default(),
267 })
268 }
269
270 async fn put_opts(
271 &self,
272 _location: &Path,
273 _payload: PutPayload,
274 _opts: PutOptions,
275 ) -> Result<PutResult> {
276 unreachable!("ByteCache does not support put")
277 }
278
279 async fn put_multipart_opts(
280 &self,
281 _location: &Path,
282 _opts: PutMultipartOptions,
283 ) -> Result<Box<dyn MultipartUpload>> {
284 unreachable!("ByteCache does not support multipart upload")
285 }
286
287 async fn delete(&self, _location: &Path) -> Result<()> {
288 unreachable!("ByteCache does not support delete")
289 }
290
291 async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
292 unreachable!("ByteCache does not support copy")
293 }
294
295 async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
296 unreachable!("ByteCache does not support copy_if_not_exists")
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303 use bytes::Bytes;
304 use liquid_cache_common::mock_store::MockStore;
305 use object_store::memory::InMemory;
306 use std::ops::Range;
307 use tempfile::tempdir;
308
309 async fn create_test_file(store: &InMemory, path: &str, size: u64) -> Result<()> {
311 let data = vec![0u8; size as usize];
312 let data: Vec<u8> = data
315 .iter()
316 .enumerate()
317 .map(|(i, _)| (i % 256) as u8)
318 .collect();
319
320 let path = Path::from(path);
321 store.put(&path, Bytes::from(data).into()).await?;
322 Ok(())
323 }
324
325 async fn verify_range(store: &dyn ObjectStore, path: &str, range: Range<u64>) -> Result<()> {
327 let path = Path::from(path);
328 let result = store.get_range(&path, range.clone()).await?;
329
330 for (i, byte) in result.iter().enumerate() {
332 let expected = ((range.start + i as u64) % 256) as u8;
333 assert_eq!(*byte, expected, "Mismatch at position {i}");
334 }
335
336 Ok(())
337 }
338
339 #[tokio::test]
341 async fn test_small_file() -> Result<()> {
342 let inner = Arc::new(InMemory::new());
343 let temp_dir = tempdir().unwrap();
344 let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
345
346 let file_path = "small_file.bin";
348 create_test_file(&inner, file_path, 10 * 1024).await?;
349
350 verify_range(&cache, file_path, 0..10 * 1024).await?;
352
353 let cache_dir = cache.get_cache_dir_for_path(&Path::from(file_path));
355 assert!(cache_dir.exists(), "Cache directory should exist");
356
357 let chunk_path = cache.get_chunk_path(&Path::from(file_path), 0);
358 assert!(
359 cache.is_chunk_ready(&chunk_path),
360 "Chunk file should be ready"
361 );
362
363 Ok(())
364 }
365
366 #[tokio::test]
368 async fn test_large_file() -> Result<()> {
369 let inner = Arc::new(InMemory::new());
370 let temp_dir = tempdir().unwrap();
371 let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
372
373 let file_path = "large_file.bin";
375 let file_size = CACHE_BLOCK_SIZE * 2 + 1024 * 1024; create_test_file(&inner, file_path, file_size).await?;
377
378 verify_range(&cache, file_path, 0..file_size).await?;
380
381 for chunk_idx in 0..=2 {
383 let chunk_path = cache.get_chunk_path(&Path::from(file_path), chunk_idx);
384 assert!(
385 cache.is_chunk_ready(&chunk_path),
386 "Chunk {chunk_idx} should be ready"
387 );
388 }
389
390 Ok(())
391 }
392
393 #[tokio::test]
395 async fn test_range_within_chunk() -> Result<()> {
396 let inner = Arc::new(InMemory::new());
397 let temp_dir = tempdir().unwrap();
398 let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
399
400 let file_path = "range_test.bin";
402 let file_size = CACHE_BLOCK_SIZE * 3; create_test_file(&inner, file_path, file_size).await?;
404
405 let start = CACHE_BLOCK_SIZE + 1024;
407 let end = CACHE_BLOCK_SIZE + 2048;
408 verify_range(&cache, file_path, start..end).await?;
409
410 let chunk1_path = cache.get_chunk_path(&Path::from(file_path), 1);
412 assert!(
413 cache.is_chunk_ready(&chunk1_path),
414 "Chunk 1 should be ready"
415 );
416
417 let chunk0_path = cache.get_chunk_path(&Path::from(file_path), 0);
419 let chunk2_path = cache.get_chunk_path(&Path::from(file_path), 2);
420 assert!(
421 !cache.is_chunk_ready(&chunk0_path),
422 "Chunk 0 should not be ready yet"
423 );
424 assert!(
425 !cache.is_chunk_ready(&chunk2_path),
426 "Chunk 2 should not be ready yet"
427 );
428
429 Ok(())
430 }
431
432 #[tokio::test]
434 async fn test_range_across_chunks() -> Result<()> {
435 let inner = Arc::new(InMemory::new());
436 let temp_dir = tempdir().unwrap();
437 let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
438
439 let file_path = "multi_chunk_range.bin";
441 let file_size = CACHE_BLOCK_SIZE * 3; create_test_file(&inner, file_path, file_size).await?;
443
444 let start = CACHE_BLOCK_SIZE - 1024;
446 let end = CACHE_BLOCK_SIZE * 2 + 1024;
447 verify_range(&cache, file_path, start..end).await?;
448
449 let chunk0_path = cache.get_chunk_path(&Path::from(file_path), 0);
451 let chunk1_path = cache.get_chunk_path(&Path::from(file_path), 1);
452 let chunk2_path = cache.get_chunk_path(&Path::from(file_path), 2);
453 assert!(
454 cache.is_chunk_ready(&chunk0_path),
455 "Chunk 0 should be ready"
456 );
457 assert!(
458 cache.is_chunk_ready(&chunk1_path),
459 "Chunk 1 should be ready"
460 );
461 assert!(
462 cache.is_chunk_ready(&chunk2_path),
463 "Chunk 2 should be ready"
464 );
465
466 Ok(())
467 }
468
469 #[tokio::test]
471 async fn test_cache_hit() -> Result<()> {
472 let inner = Arc::new(InMemory::new());
473 let temp_dir = tempdir().unwrap();
474 let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
475
476 let file_path = "cache_hit.bin";
478 let file_size = CACHE_BLOCK_SIZE + 1024; create_test_file(&inner, file_path, file_size).await?;
480
481 verify_range(&cache, file_path, 0..file_size).await?;
483
484 let modified_data = vec![255u8; file_size as usize];
486 let path = Path::from(file_path);
487 inner.put(&path, Bytes::from(modified_data).into()).await?;
488
489 verify_range(&cache, file_path, 0..file_size).await?;
491
492 Ok(())
493 }
494
495 #[tokio::test]
497 async fn test_suffix_range() -> Result<()> {
498 let inner = Arc::new(InMemory::new());
499 let temp_dir = tempdir().unwrap();
500 let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
501
502 let file_path = "suffix_range.bin";
504 let file_size = CACHE_BLOCK_SIZE * 2; create_test_file(&inner, file_path, file_size).await?;
506
507 let path = Path::from(file_path);
509 let options = GetOptions {
510 range: Some(GetRange::Suffix(1024 * 1024)),
511 ..Default::default()
512 };
513
514 let result = cache.get_opts(&path, options).await?;
515 let data = result.bytes().await?;
516
517 assert_eq!(data.len(), 1024 * 1024);
519
520 let start = file_size - 1024 * 1024;
522 for (i, byte) in data.iter().enumerate() {
523 let expected = ((start + i as u64) % 256) as u8;
524 assert_eq!(*byte, expected, "Mismatch at position {i}");
525 }
526
527 Ok(())
528 }
529
530 #[tokio::test]
531 async fn test_persistent_cache() -> Result<()> {
532 let inner = Arc::new(InMemory::new());
534 let temp_dir = tempdir().unwrap();
535 let cache_dir_path = temp_dir.path().to_path_buf();
536
537 let file_path = "persistent_test.bin";
539 let file_size = CACHE_BLOCK_SIZE + 1024; create_test_file(&inner, file_path, file_size).await?;
541
542 {
544 let first_cache = ByteCache::new(inner.clone(), cache_dir_path.clone());
545 verify_range(&first_cache, file_path, 0..file_size).await?;
546
547 let chunk_path = first_cache.get_chunk_path(&Path::from(file_path), 0);
549 assert!(
550 first_cache.is_chunk_ready(&chunk_path),
551 "First chunk should be cached"
552 );
553 }
554
555 let modified_data = vec![255u8; file_size as usize];
557 let path = Path::from(file_path);
558 inner.put(&path, Bytes::from(modified_data).into()).await?;
559
560 let second_cache = ByteCache::new(inner.clone(), cache_dir_path);
562
563 verify_range(&second_cache, file_path, 0..file_size).await?;
565
566 let mid_range = file_size / 2;
568 verify_range(&second_cache, file_path, mid_range..mid_range + 1024).await?;
569
570 Ok(())
571 }
572
573 #[tokio::test]
574 async fn test_object_store_metrics() -> Result<()> {
575 let inner = Arc::new(MockStore::new_with_files(
576 1,
577 (CACHE_BLOCK_SIZE * 3) as usize,
578 ));
579 let temp_dir = tempdir().unwrap();
580 let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
581
582 let path = Path::from("0.parquet");
583 let start = CACHE_BLOCK_SIZE / 2;
584 let end = CACHE_BLOCK_SIZE + CACHE_BLOCK_SIZE / 2;
585
586 verify_range(&cache, path.as_ref(), start..end).await?;
587
588 let ranges = inner.get_access_ranges(&path).unwrap();
589 assert_eq!(ranges.len(), 2);
590 assert_eq!(ranges[0], 0..CACHE_BLOCK_SIZE);
591 assert_eq!(ranges[1], CACHE_BLOCK_SIZE..CACHE_BLOCK_SIZE * 2);
592
593 verify_range(&cache, path.as_ref(), start + 1024..end - 1024).await?;
595 let ranges_after = inner.get_access_ranges(&path).unwrap();
596 assert_eq!(ranges_after.len(), 2);
597
598 Ok(())
599 }
600}