liquid_cache_storage/
byte_cache.rs

1use std::{
2    fmt::{Display, Formatter},
3    fs,
4    ops::{Range, RangeInclusive},
5    path::PathBuf,
6    sync::Arc,
7};
8
9use async_stream::stream;
10use async_trait::async_trait;
11use bytes::Bytes;
12use futures::{Stream, stream::BoxStream};
13use object_store::{
14    Error, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
15    ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
16    path::Path,
17};
18use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
19
20const CACHE_BLOCK_SIZE: u64 = 1024 * 1024 * 4; // 4MB
21
22/// Byte cache for liquid cache, act like a object store.
23#[derive(Debug, Clone)]
24pub struct ByteCache {
25    inner: Arc<dyn ObjectStore>,
26    cache_dir: PathBuf,
27}
28
29impl ByteCache {
30    /// Create a new byte cache, the cache_dir is the directory to store the cached files
31    /// `ByteCache` can read from a previously initialized cache directory
32    pub fn new(inner: Arc<dyn ObjectStore>, cache_dir: PathBuf) -> Self {
33        if !cache_dir.exists() {
34            fs::create_dir_all(&cache_dir).expect("Failed to create cache directory");
35        }
36        Self { inner, cache_dir }
37    }
38
39    /// Convert a path to a cached directory path
40    fn get_cache_dir_for_path(&self, path: &Path) -> PathBuf {
41        let path_str = path.as_ref().replace("/", "_");
42        self.cache_dir.join(path_str)
43    }
44
45    /// Get the path for a specific chunk of a file
46    fn get_chunk_path(&self, path: &Path, chunk_index: u64) -> PathBuf {
47        let cache_dir = self.get_cache_dir_for_path(path);
48        cache_dir.join(format!("chunk_{chunk_index}.bin"))
49    }
50
51    /// Get the temporary path for a chunk while it's being written
52    fn get_temp_chunk_path(&self, path: &Path, chunk_index: u64) -> PathBuf {
53        let cache_dir = self.get_cache_dir_for_path(path);
54        // Include timestamp and thread ID to avoid conflicts between concurrent writes
55        let now = std::time::SystemTime::now()
56            .duration_since(std::time::UNIX_EPOCH)
57            .unwrap_or_default()
58            .as_nanos();
59        let thread_id = std::thread::current().id();
60        cache_dir.join(format!(
61            "in-progress-chunk_{chunk_index}_{now}_{thread_id:?}.bin"
62        ))
63    }
64
65    /// Calculate which chunks are needed for a given range
66    fn chunks_for_range(&self, range: &Range<u64>) -> RangeInclusive<u64> {
67        let start_chunk = range.start / CACHE_BLOCK_SIZE;
68        let end_chunk = (range.end - 1) / CACHE_BLOCK_SIZE; // -1 because end is exclusive
69        start_chunk..=end_chunk
70    }
71
72    /// Check if a cached chunk is ready to be read (file exists, since rename is atomic)
73    fn is_chunk_ready(&self, chunk_path: &std::path::Path) -> bool {
74        chunk_path.exists()
75    }
76
77    /// Read data from a cached chunk
78    async fn read_from_cached_chunk(
79        &self,
80        chunk_path: PathBuf,
81        offset: u64,
82        len: usize,
83    ) -> Result<Bytes> {
84        let mut file = tokio::fs::File::open(&chunk_path)
85            .await
86            .map_err(|e| Error::Generic {
87                store: "ByteCache",
88                source: Box::new(e),
89            })?;
90
91        let mut buffer = vec![0u8; len];
92        file.seek(tokio::io::SeekFrom::Start(offset))
93            .await
94            .map_err(|e| Error::Generic {
95                store: "ByteCache",
96                source: Box::new(e),
97            })?;
98
99        file.read_exact(&mut buffer)
100            .await
101            .map_err(|e| Error::Generic {
102                store: "ByteCache",
103                source: Box::new(e),
104            })?;
105
106        Ok(Bytes::from(buffer))
107    }
108
109    /// Save a chunk to the cache
110    async fn save_chunk(&self, path: &Path, chunk_index: u64, data: Bytes) -> Result<()> {
111        let cache_dir = self.get_cache_dir_for_path(path);
112        if !cache_dir.exists() {
113            fs::create_dir_all(&cache_dir).map_err(|e| Error::Generic {
114                store: "ByteCache",
115                source: Box::new(e),
116            })?;
117        }
118
119        // Write to a temporary file first
120        let temp_chunk_path = self.get_temp_chunk_path(path, chunk_index);
121        let final_chunk_path = self.get_chunk_path(path, chunk_index);
122
123        let mut file = tokio::fs::File::create(&temp_chunk_path)
124            .await
125            .map_err(|e| Error::Generic {
126                store: "ByteCache",
127                source: Box::new(e),
128            })?;
129
130        // Write the data to the temporary file
131        file.write_all(&data).await.map_err(|e| Error::Generic {
132            store: "ByteCache",
133            source: Box::new(e),
134        })?;
135
136        // Sync to ensure data is written to disk before rename
137        file.sync_all().await.map_err(|e| Error::Generic {
138            store: "ByteCache",
139            source: Box::new(e),
140        })?;
141
142        // Close the file before rename
143        drop(file);
144
145        // Atomically rename the temporary file to the final location
146        // If another thread already created the file, that's fine - we can just clean up our temp file
147        match tokio::fs::rename(&temp_chunk_path, &final_chunk_path).await {
148            Ok(_) => Ok(()),
149            Err(e) => {
150                let _ = tokio::fs::remove_file(&temp_chunk_path).await;
151                if final_chunk_path.exists() {
152                    Ok(())
153                } else {
154                    Err(Error::Generic {
155                        store: "ByteCache",
156                        source: Box::new(e),
157                    })
158                }
159            }
160        }
161    }
162
163    /// Get range data from cached chunks
164    fn get_range_from_cache_stream(
165        &self,
166        location: &Path,
167        range: &Range<u64>,
168    ) -> impl Stream<Item = Result<Bytes>> + Send + 'static {
169        let this = self.clone();
170        let location = location.clone();
171        let range = range.clone();
172        stream! {
173            let chunks_needed = this.chunks_for_range(&range);
174            for chunk_idx in chunks_needed {
175                let chunk_path = this.get_chunk_path(&location, chunk_idx);
176                let chunk_start = chunk_idx * CACHE_BLOCK_SIZE;
177                let chunk_end = chunk_start + CACHE_BLOCK_SIZE;
178
179                let overlap_start = std::cmp::max(chunk_start, range.start);
180                let overlap_end = std::cmp::min(chunk_end, range.end);
181
182                if overlap_start < overlap_end {
183                    let offset_in_chunk = overlap_start - chunk_start;
184                    let length = overlap_end - overlap_start;
185
186                    yield this
187                        .read_from_cached_chunk(chunk_path, offset_in_chunk, length as usize)
188                        .await;
189                }
190            }
191        }
192    }
193}
194
195impl Display for ByteCache {
196    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
197        write!(f, "ByteCache(cache_dir: {:?})", self.cache_dir)
198    }
199}
200
201#[async_trait]
202impl ObjectStore for ByteCache {
203    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
204        self.inner.list(prefix)
205    }
206
207    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
208        self.inner.list_with_delimiter(prefix).await
209    }
210
211    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
212        // Get the metadata first to know the file size
213        let meta = self.inner.head(location).await?;
214        let file_size = meta.size;
215
216        // If this is just a HEAD request, return the metadata
217        if options.head {
218            return self.inner.get_opts(location, options).await;
219        }
220
221        // Determine the range we need to fetch
222        let range = match &options.range {
223            Some(GetRange::Bounded(range)) => range.clone(),
224            Some(GetRange::Suffix(suffix)) => (file_size.saturating_sub(*suffix))..file_size,
225            Some(GetRange::Offset(offset)) => *offset..file_size,
226            None => 0..file_size,
227        };
228
229        // Calculate which chunks we need
230        let chunks_needed = self.chunks_for_range(&range);
231
232        // Check which chunks are already cached
233        let mut missing_chunks = Vec::new();
234        for chunk_idx in chunks_needed {
235            let chunk_path = self.get_chunk_path(location, chunk_idx);
236            if !self.is_chunk_ready(&chunk_path) {
237                missing_chunks.push(chunk_idx);
238            }
239        }
240
241        // Fetch missing chunks from the underlying store
242        for chunk_idx in missing_chunks {
243            let chunk_start = chunk_idx * CACHE_BLOCK_SIZE;
244            let chunk_end = std::cmp::min(chunk_start + CACHE_BLOCK_SIZE, file_size);
245
246            let chunk_range = GetRange::Bounded(chunk_start..chunk_end);
247            let chunk_options = GetOptions {
248                range: Some(chunk_range),
249                ..options.clone()
250            };
251
252            let chunk_result = self.inner.get_opts(location, chunk_options).await?;
253            let chunk_data = chunk_result.bytes().await?;
254
255            // Save the chunk to cache
256            self.save_chunk(location, chunk_idx, chunk_data).await?;
257        }
258
259        // Return a GetResult with the stream of bytes from cache
260        Ok(GetResult {
261            payload: GetResultPayload::Stream(Box::pin(
262                self.get_range_from_cache_stream(location, &range),
263            )),
264            meta,
265            range,
266            attributes: Default::default(),
267        })
268    }
269
270    async fn put_opts(
271        &self,
272        _location: &Path,
273        _payload: PutPayload,
274        _opts: PutOptions,
275    ) -> Result<PutResult> {
276        unreachable!("ByteCache does not support put")
277    }
278
279    async fn put_multipart_opts(
280        &self,
281        _location: &Path,
282        _opts: PutMultipartOptions,
283    ) -> Result<Box<dyn MultipartUpload>> {
284        unreachable!("ByteCache does not support multipart upload")
285    }
286
287    async fn delete(&self, _location: &Path) -> Result<()> {
288        unreachable!("ByteCache does not support delete")
289    }
290
291    async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
292        unreachable!("ByteCache does not support copy")
293    }
294
295    async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
296        unreachable!("ByteCache does not support copy_if_not_exists")
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303    use bytes::Bytes;
304    use liquid_cache_common::mock_store::MockStore;
305    use object_store::memory::InMemory;
306    use std::ops::Range;
307    use tempfile::tempdir;
308
309    // Helper function to create a test file of specified size in the in-memory store
310    async fn create_test_file(store: &InMemory, path: &str, size: u64) -> Result<()> {
311        let data = vec![0u8; size as usize];
312        // Fill the data with a pattern: index % 256
313        // This makes it easy to verify ranges
314        let data: Vec<u8> = data
315            .iter()
316            .enumerate()
317            .map(|(i, _)| (i % 256) as u8)
318            .collect();
319
320        let path = Path::from(path);
321        store.put(&path, Bytes::from(data).into()).await?;
322        Ok(())
323    }
324
325    // Helper function to read a range from the store and verify it
326    async fn verify_range(store: &dyn ObjectStore, path: &str, range: Range<u64>) -> Result<()> {
327        let path = Path::from(path);
328        let result = store.get_range(&path, range.clone()).await?;
329
330        // Verify that the returned data matches the expected pattern
331        for (i, byte) in result.iter().enumerate() {
332            let expected = ((range.start + i as u64) % 256) as u8;
333            assert_eq!(*byte, expected, "Mismatch at position {i}");
334        }
335
336        Ok(())
337    }
338
339    // Test reading a small file (less than one chunk)
340    #[tokio::test]
341    async fn test_small_file() -> Result<()> {
342        let inner = Arc::new(InMemory::new());
343        let temp_dir = tempdir().unwrap();
344        let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
345
346        // Create a small file (10KB)
347        let file_path = "small_file.bin";
348        create_test_file(&inner, file_path, 10 * 1024).await?;
349
350        // Read the entire file
351        verify_range(&cache, file_path, 0..10 * 1024).await?;
352
353        // Verify the file was cached
354        let cache_dir = cache.get_cache_dir_for_path(&Path::from(file_path));
355        assert!(cache_dir.exists(), "Cache directory should exist");
356
357        let chunk_path = cache.get_chunk_path(&Path::from(file_path), 0);
358        assert!(
359            cache.is_chunk_ready(&chunk_path),
360            "Chunk file should be ready"
361        );
362
363        Ok(())
364    }
365
366    // Test reading a large file (multiple chunks)
367    #[tokio::test]
368    async fn test_large_file() -> Result<()> {
369        let inner = Arc::new(InMemory::new());
370        let temp_dir = tempdir().unwrap();
371        let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
372
373        // Create a file slightly larger than 2 chunks (9MB)
374        let file_path = "large_file.bin";
375        let file_size = CACHE_BLOCK_SIZE * 2 + 1024 * 1024; // 9MB
376        create_test_file(&inner, file_path, file_size).await?;
377
378        // Read the entire file
379        verify_range(&cache, file_path, 0..file_size).await?;
380
381        // Verify all chunks were cached
382        for chunk_idx in 0..=2 {
383            let chunk_path = cache.get_chunk_path(&Path::from(file_path), chunk_idx);
384            assert!(
385                cache.is_chunk_ready(&chunk_path),
386                "Chunk {chunk_idx} should be ready"
387            );
388        }
389
390        Ok(())
391    }
392
393    // Test reading a range within a single chunk
394    #[tokio::test]
395    async fn test_range_within_chunk() -> Result<()> {
396        let inner = Arc::new(InMemory::new());
397        let temp_dir = tempdir().unwrap();
398        let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
399
400        // Create a file larger than one chunk
401        let file_path = "range_test.bin";
402        let file_size = CACHE_BLOCK_SIZE * 3; // 12MB
403        create_test_file(&inner, file_path, file_size).await?;
404
405        // Read a range entirely within the second chunk
406        let start = CACHE_BLOCK_SIZE + 1024;
407        let end = CACHE_BLOCK_SIZE + 2048;
408        verify_range(&cache, file_path, start..end).await?;
409
410        // Verify only the requested chunk was cached
411        let chunk1_path = cache.get_chunk_path(&Path::from(file_path), 1);
412        assert!(
413            cache.is_chunk_ready(&chunk1_path),
414            "Chunk 1 should be ready"
415        );
416
417        // Other chunks should not be cached yet
418        let chunk0_path = cache.get_chunk_path(&Path::from(file_path), 0);
419        let chunk2_path = cache.get_chunk_path(&Path::from(file_path), 2);
420        assert!(
421            !cache.is_chunk_ready(&chunk0_path),
422            "Chunk 0 should not be ready yet"
423        );
424        assert!(
425            !cache.is_chunk_ready(&chunk2_path),
426            "Chunk 2 should not be ready yet"
427        );
428
429        Ok(())
430    }
431
432    // Test reading a range that spans multiple chunks
433    #[tokio::test]
434    async fn test_range_across_chunks() -> Result<()> {
435        let inner = Arc::new(InMemory::new());
436        let temp_dir = tempdir().unwrap();
437        let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
438
439        // Create a file larger than two chunks
440        let file_path = "multi_chunk_range.bin";
441        let file_size = CACHE_BLOCK_SIZE * 3; // 12MB
442        create_test_file(&inner, file_path, file_size).await?;
443
444        // Read a range that spans chunk 1 and chunk 2
445        let start = CACHE_BLOCK_SIZE - 1024;
446        let end = CACHE_BLOCK_SIZE * 2 + 1024;
447        verify_range(&cache, file_path, start..end).await?;
448
449        // Verify the chunks were cached
450        let chunk0_path = cache.get_chunk_path(&Path::from(file_path), 0);
451        let chunk1_path = cache.get_chunk_path(&Path::from(file_path), 1);
452        let chunk2_path = cache.get_chunk_path(&Path::from(file_path), 2);
453        assert!(
454            cache.is_chunk_ready(&chunk0_path),
455            "Chunk 0 should be ready"
456        );
457        assert!(
458            cache.is_chunk_ready(&chunk1_path),
459            "Chunk 1 should be ready"
460        );
461        assert!(
462            cache.is_chunk_ready(&chunk2_path),
463            "Chunk 2 should be ready"
464        );
465
466        Ok(())
467    }
468
469    // Test cache hit (read the same file twice)
470    #[tokio::test]
471    async fn test_cache_hit() -> Result<()> {
472        let inner = Arc::new(InMemory::new());
473        let temp_dir = tempdir().unwrap();
474        let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
475
476        // Create a file
477        let file_path = "cache_hit.bin";
478        let file_size = CACHE_BLOCK_SIZE + 1024; // Slightly more than one chunk
479        create_test_file(&inner, file_path, file_size).await?;
480
481        // Read the file to populate the cache
482        verify_range(&cache, file_path, 0..file_size).await?;
483
484        // Modify the original file in the inner store to verify we're reading from cache
485        let modified_data = vec![255u8; file_size as usize];
486        let path = Path::from(file_path);
487        inner.put(&path, Bytes::from(modified_data).into()).await?;
488
489        // Read the same range again - should get the original data from cache, not the modified data
490        verify_range(&cache, file_path, 0..file_size).await?;
491
492        Ok(())
493    }
494
495    // Test partial range requests
496    #[tokio::test]
497    async fn test_suffix_range() -> Result<()> {
498        let inner = Arc::new(InMemory::new());
499        let temp_dir = tempdir().unwrap();
500        let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
501
502        // Create a file
503        let file_path = "suffix_range.bin";
504        let file_size = CACHE_BLOCK_SIZE * 2; // 8MB
505        create_test_file(&inner, file_path, file_size).await?;
506
507        // Request the last 1MB of the file using GetRange::Suffix
508        let path = Path::from(file_path);
509        let options = GetOptions {
510            range: Some(GetRange::Suffix(1024 * 1024)),
511            ..Default::default()
512        };
513
514        let result = cache.get_opts(&path, options).await?;
515        let data = result.bytes().await?;
516
517        // Verify we got the right data size
518        assert_eq!(data.len(), 1024 * 1024);
519
520        // Verify the content matches expected pattern
521        let start = file_size - 1024 * 1024;
522        for (i, byte) in data.iter().enumerate() {
523            let expected = ((start + i as u64) % 256) as u8;
524            assert_eq!(*byte, expected, "Mismatch at position {i}");
525        }
526
527        Ok(())
528    }
529
530    #[tokio::test]
531    async fn test_persistent_cache() -> Result<()> {
532        // Create an InMemory store as the inner store
533        let inner = Arc::new(InMemory::new());
534        let temp_dir = tempdir().unwrap();
535        let cache_dir_path = temp_dir.path().to_path_buf();
536
537        // Create a file in the inner store
538        let file_path = "persistent_test.bin";
539        let file_size = CACHE_BLOCK_SIZE + 1024; // Slightly more than one chunk
540        create_test_file(&inner, file_path, file_size).await?;
541
542        // First cache instance - read data to populate cache
543        {
544            let first_cache = ByteCache::new(inner.clone(), cache_dir_path.clone());
545            verify_range(&first_cache, file_path, 0..file_size).await?;
546
547            // Verify data was cached
548            let chunk_path = first_cache.get_chunk_path(&Path::from(file_path), 0);
549            assert!(
550                first_cache.is_chunk_ready(&chunk_path),
551                "First chunk should be cached"
552            );
553        }
554
555        // Modify the data in the inner store to verify the second cache uses cached data
556        let modified_data = vec![255u8; file_size as usize];
557        let path = Path::from(file_path);
558        inner.put(&path, Bytes::from(modified_data).into()).await?;
559
560        // Create a new cache instance pointing to the same directory
561        let second_cache = ByteCache::new(inner.clone(), cache_dir_path);
562
563        // Read the data through the second cache - should get original data from cache
564        verify_range(&second_cache, file_path, 0..file_size).await?;
565
566        // Additional verification - check if a specific range is read correctly
567        let mid_range = file_size / 2;
568        verify_range(&second_cache, file_path, mid_range..mid_range + 1024).await?;
569
570        Ok(())
571    }
572
573    #[tokio::test]
574    async fn test_object_store_metrics() -> Result<()> {
575        let inner = Arc::new(MockStore::new_with_files(
576            1,
577            (CACHE_BLOCK_SIZE * 3) as usize,
578        ));
579        let temp_dir = tempdir().unwrap();
580        let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
581
582        let path = Path::from("0.parquet");
583        let start = CACHE_BLOCK_SIZE / 2;
584        let end = CACHE_BLOCK_SIZE + CACHE_BLOCK_SIZE / 2;
585
586        verify_range(&cache, path.as_ref(), start..end).await?;
587
588        let ranges = inner.get_access_ranges(&path).unwrap();
589        assert_eq!(ranges.len(), 2);
590        assert_eq!(ranges[0], 0..CACHE_BLOCK_SIZE);
591        assert_eq!(ranges[1], CACHE_BLOCK_SIZE..CACHE_BLOCK_SIZE * 2);
592
593        // second request should hit cache and not increase range count
594        verify_range(&cache, path.as_ref(), start + 1024..end - 1024).await?;
595        let ranges_after = inner.get_access_ranges(&path).unwrap();
596        assert_eq!(ranges_after.len(), 2);
597
598        Ok(())
599    }
600}