Skip to main content

liquid_cache/utils/
byte_cache.rs

1use std::{
2    fmt::{Display, Formatter},
3    fs,
4    ops::{Range, RangeInclusive},
5    path::PathBuf,
6    sync::Arc,
7};
8
9use async_stream::stream;
10use async_trait::async_trait;
11use bytes::Bytes;
12use futures::{Stream, stream::BoxStream};
13use object_store::{
14    CopyOptions, Error, GetOptions, GetRange, GetResult, GetResultPayload, ListResult,
15    MultipartUpload, ObjectMeta, ObjectStore, ObjectStoreExt, PutMultipartOptions, PutOptions,
16    PutPayload, PutResult, Result, path::Path,
17};
18use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
19
20const CACHE_BLOCK_SIZE: u64 = 1024 * 1024 * 4; // 4MB
21
22/// Byte cache for liquid cache, act like a object store.
23#[derive(Debug, Clone)]
24pub struct ByteCache {
25    inner: Arc<dyn ObjectStore>,
26    cache_dir: PathBuf,
27}
28
29impl ByteCache {
30    /// Create a new byte cache, the cache_dir is the directory to store the cached files
31    /// `ByteCache` can read from a previously initialized cache directory
32    pub fn new(inner: Arc<dyn ObjectStore>, cache_dir: PathBuf) -> Self {
33        if !cache_dir.exists() {
34            fs::create_dir_all(&cache_dir).expect("Failed to create cache directory");
35        }
36        Self { inner, cache_dir }
37    }
38
39    /// Convert a path to a cached directory path
40    fn get_cache_dir_for_path(&self, path: &Path) -> PathBuf {
41        let path_str = path.as_ref().replace("/", "_");
42        self.cache_dir.join(path_str)
43    }
44
45    /// Get the path for a specific chunk of a file
46    fn get_chunk_path(&self, path: &Path, chunk_index: u64) -> PathBuf {
47        let cache_dir = self.get_cache_dir_for_path(path);
48        cache_dir.join(format!("chunk_{chunk_index}.bin"))
49    }
50
51    /// Get the temporary path for a chunk while it's being written
52    fn get_temp_chunk_path(&self, path: &Path, chunk_index: u64) -> PathBuf {
53        let cache_dir = self.get_cache_dir_for_path(path);
54        // Include timestamp and thread ID to avoid conflicts between concurrent writes
55        let now = std::time::SystemTime::now()
56            .duration_since(std::time::UNIX_EPOCH)
57            .unwrap_or_default()
58            .as_nanos();
59        let thread_id = std::thread::current().id();
60        cache_dir.join(format!(
61            "in-progress-chunk_{chunk_index}_{now}_{thread_id:?}.bin"
62        ))
63    }
64
65    /// Calculate which chunks are needed for a given range
66    fn chunks_for_range(&self, range: &Range<u64>) -> RangeInclusive<u64> {
67        let start_chunk = range.start / CACHE_BLOCK_SIZE;
68        let end_chunk = (range.end - 1) / CACHE_BLOCK_SIZE; // -1 because end is exclusive
69        start_chunk..=end_chunk
70    }
71
72    /// Check if a cached chunk is ready to be read (file exists, since rename is atomic)
73    fn is_chunk_ready(&self, chunk_path: &std::path::Path) -> bool {
74        chunk_path.exists()
75    }
76
77    /// Read data from a cached chunk
78    async fn read_from_cached_chunk(
79        &self,
80        chunk_path: PathBuf,
81        offset: u64,
82        len: usize,
83    ) -> Result<Bytes> {
84        let mut file = tokio::fs::File::open(&chunk_path)
85            .await
86            .map_err(|e| Error::Generic {
87                store: "ByteCache",
88                source: Box::new(e),
89            })?;
90
91        let mut buffer = vec![0u8; len];
92        file.seek(tokio::io::SeekFrom::Start(offset))
93            .await
94            .map_err(|e| Error::Generic {
95                store: "ByteCache",
96                source: Box::new(e),
97            })?;
98
99        file.read_exact(&mut buffer)
100            .await
101            .map_err(|e| Error::Generic {
102                store: "ByteCache",
103                source: Box::new(e),
104            })?;
105
106        Ok(Bytes::from(buffer))
107    }
108
109    /// Save a chunk to the cache
110    async fn save_chunk(&self, path: &Path, chunk_index: u64, data: Bytes) -> Result<()> {
111        let cache_dir = self.get_cache_dir_for_path(path);
112        if !cache_dir.exists() {
113            fs::create_dir_all(&cache_dir).map_err(|e| Error::Generic {
114                store: "ByteCache",
115                source: Box::new(e),
116            })?;
117        }
118
119        // Write to a temporary file first
120        let temp_chunk_path = self.get_temp_chunk_path(path, chunk_index);
121        let final_chunk_path = self.get_chunk_path(path, chunk_index);
122
123        let mut file = tokio::fs::File::create(&temp_chunk_path)
124            .await
125            .map_err(|e| Error::Generic {
126                store: "ByteCache",
127                source: Box::new(e),
128            })?;
129
130        // Write the data to the temporary file
131        file.write_all(&data).await.map_err(|e| Error::Generic {
132            store: "ByteCache",
133            source: Box::new(e),
134        })?;
135
136        // Sync to ensure data is written to disk before rename
137        file.sync_all().await.map_err(|e| Error::Generic {
138            store: "ByteCache",
139            source: Box::new(e),
140        })?;
141
142        // Close the file before rename
143        drop(file);
144
145        // Atomically rename the temporary file to the final location
146        // If another thread already created the file, that's fine - we can just clean up our temp file
147        match tokio::fs::rename(&temp_chunk_path, &final_chunk_path).await {
148            Ok(_) => Ok(()),
149            Err(e) => {
150                let _ = tokio::fs::remove_file(&temp_chunk_path).await;
151                if final_chunk_path.exists() {
152                    Ok(())
153                } else {
154                    Err(Error::Generic {
155                        store: "ByteCache",
156                        source: Box::new(e),
157                    })
158                }
159            }
160        }
161    }
162
163    /// Get range data from cached chunks
164    fn get_range_from_cache_stream(
165        &self,
166        location: &Path,
167        range: &Range<u64>,
168    ) -> impl Stream<Item = Result<Bytes>> + Send + 'static {
169        let this = self.clone();
170        let location = location.clone();
171        let range = range.clone();
172        stream! {
173            let chunks_needed = this.chunks_for_range(&range);
174            for chunk_idx in chunks_needed {
175                let chunk_path = this.get_chunk_path(&location, chunk_idx);
176                let chunk_start = chunk_idx * CACHE_BLOCK_SIZE;
177                let chunk_end = chunk_start + CACHE_BLOCK_SIZE;
178
179                let overlap_start = std::cmp::max(chunk_start, range.start);
180                let overlap_end = std::cmp::min(chunk_end, range.end);
181
182                if overlap_start < overlap_end {
183                    let offset_in_chunk = overlap_start - chunk_start;
184                    let length = overlap_end - overlap_start;
185
186                    yield this
187                        .read_from_cached_chunk(chunk_path, offset_in_chunk, length as usize)
188                        .await;
189                }
190            }
191        }
192    }
193}
194
195impl Display for ByteCache {
196    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
197        write!(f, "ByteCache(cache_dir: {:?})", self.cache_dir)
198    }
199}
200
201#[async_trait]
202impl ObjectStore for ByteCache {
203    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
204        self.inner.list(prefix)
205    }
206
207    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
208        self.inner.list_with_delimiter(prefix).await
209    }
210
211    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
212        // Get the metadata first to know the file size
213        let meta = self.inner.head(location).await?;
214        let file_size = meta.size;
215
216        // If this is just a HEAD request, return the metadata
217        if options.head {
218            return self.inner.get_opts(location, options).await;
219        }
220
221        // Determine the range we need to fetch
222        let range = match &options.range {
223            Some(GetRange::Bounded(range)) => range.clone(),
224            Some(GetRange::Suffix(suffix)) => (file_size.saturating_sub(*suffix))..file_size,
225            Some(GetRange::Offset(offset)) => *offset..file_size,
226            None => 0..file_size,
227        };
228
229        // Calculate which chunks we need
230        let chunks_needed = self.chunks_for_range(&range);
231
232        // Check which chunks are already cached
233        let mut missing_chunks = Vec::new();
234        for chunk_idx in chunks_needed {
235            let chunk_path = self.get_chunk_path(location, chunk_idx);
236            if !self.is_chunk_ready(&chunk_path) {
237                missing_chunks.push(chunk_idx);
238            }
239        }
240
241        // Fetch missing chunks from the underlying store
242        for chunk_idx in missing_chunks {
243            let chunk_start = chunk_idx * CACHE_BLOCK_SIZE;
244            let chunk_end = std::cmp::min(chunk_start + CACHE_BLOCK_SIZE, file_size);
245
246            let chunk_range = GetRange::Bounded(chunk_start..chunk_end);
247            let chunk_options = GetOptions {
248                range: Some(chunk_range),
249                ..options.clone()
250            };
251
252            let chunk_result = self.inner.get_opts(location, chunk_options).await?;
253            let chunk_data = chunk_result.bytes().await?;
254
255            // Save the chunk to cache
256            self.save_chunk(location, chunk_idx, chunk_data).await?;
257        }
258
259        // Return a GetResult with the stream of bytes from cache
260        Ok(GetResult {
261            payload: GetResultPayload::Stream(Box::pin(
262                self.get_range_from_cache_stream(location, &range),
263            )),
264            meta,
265            range,
266            attributes: Default::default(),
267        })
268    }
269
270    async fn put_opts(
271        &self,
272        _location: &Path,
273        _payload: PutPayload,
274        _opts: PutOptions,
275    ) -> Result<PutResult> {
276        unreachable!("ByteCache does not support put")
277    }
278
279    async fn put_multipart_opts(
280        &self,
281        _location: &Path,
282        _opts: PutMultipartOptions,
283    ) -> Result<Box<dyn MultipartUpload>> {
284        unreachable!("ByteCache does not support multipart upload")
285    }
286
287    fn delete_stream(
288        &self,
289        _locations: BoxStream<'static, Result<Path>>,
290    ) -> BoxStream<'static, Result<Path>> {
291        unreachable!("ByteCache does not support delete")
292    }
293
294    async fn copy_opts(&self, _from: &Path, _to: &Path, _options: CopyOptions) -> Result<()> {
295        unreachable!("ByteCache does not support copy")
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use bytes::Bytes;
303    use liquid_cache_common::mock_store::MockStore;
304    use object_store::memory::InMemory;
305    use std::ops::Range;
306    use tempfile::tempdir;
307
308    // Helper function to create a test file of specified size in the in-memory store
309    async fn create_test_file(store: &InMemory, path: &str, size: u64) -> Result<()> {
310        let data = vec![0u8; size as usize];
311        // Fill the data with a pattern: index % 256
312        // This makes it easy to verify ranges
313        let data: Vec<u8> = data
314            .iter()
315            .enumerate()
316            .map(|(i, _)| (i % 256) as u8)
317            .collect();
318
319        let path = Path::from(path);
320        store.put(&path, Bytes::from(data).into()).await?;
321        Ok(())
322    }
323
324    // Helper function to read a range from the store and verify it
325    async fn verify_range(store: &dyn ObjectStore, path: &str, range: Range<u64>) -> Result<()> {
326        let path = Path::from(path);
327        let result = store.get_range(&path, range.clone()).await?;
328
329        // Verify that the returned data matches the expected pattern
330        for (i, byte) in result.iter().enumerate() {
331            let expected = ((range.start + i as u64) % 256) as u8;
332            assert_eq!(*byte, expected, "Mismatch at position {i}");
333        }
334
335        Ok(())
336    }
337
338    // Test reading a small file (less than one chunk)
339    #[tokio::test]
340    async fn test_small_file() -> Result<()> {
341        let inner = Arc::new(InMemory::new());
342        let temp_dir = tempdir().unwrap();
343        let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
344
345        // Create a small file (10KB)
346        let file_path = "small_file.bin";
347        create_test_file(&inner, file_path, 10 * 1024).await?;
348
349        // Read the entire file
350        verify_range(&cache, file_path, 0..10 * 1024).await?;
351
352        // Verify the file was cached
353        let cache_dir = cache.get_cache_dir_for_path(&Path::from(file_path));
354        assert!(cache_dir.exists(), "Cache directory should exist");
355
356        let chunk_path = cache.get_chunk_path(&Path::from(file_path), 0);
357        assert!(
358            cache.is_chunk_ready(&chunk_path),
359            "Chunk file should be ready"
360        );
361
362        Ok(())
363    }
364
365    // Test reading a large file (multiple chunks)
366    #[tokio::test]
367    async fn test_large_file() -> Result<()> {
368        let inner = Arc::new(InMemory::new());
369        let temp_dir = tempdir().unwrap();
370        let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
371
372        // Create a file slightly larger than 2 chunks (9MB)
373        let file_path = "large_file.bin";
374        let file_size = CACHE_BLOCK_SIZE * 2 + 1024 * 1024; // 9MB
375        create_test_file(&inner, file_path, file_size).await?;
376
377        // Read the entire file
378        verify_range(&cache, file_path, 0..file_size).await?;
379
380        // Verify all chunks were cached
381        for chunk_idx in 0..=2 {
382            let chunk_path = cache.get_chunk_path(&Path::from(file_path), chunk_idx);
383            assert!(
384                cache.is_chunk_ready(&chunk_path),
385                "Chunk {chunk_idx} should be ready"
386            );
387        }
388
389        Ok(())
390    }
391
392    // Test reading a range within a single chunk
393    #[tokio::test]
394    async fn test_range_within_chunk() -> Result<()> {
395        let inner = Arc::new(InMemory::new());
396        let temp_dir = tempdir().unwrap();
397        let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
398
399        // Create a file larger than one chunk
400        let file_path = "range_test.bin";
401        let file_size = CACHE_BLOCK_SIZE * 3; // 12MB
402        create_test_file(&inner, file_path, file_size).await?;
403
404        // Read a range entirely within the second chunk
405        let start = CACHE_BLOCK_SIZE + 1024;
406        let end = CACHE_BLOCK_SIZE + 2048;
407        verify_range(&cache, file_path, start..end).await?;
408
409        // Verify only the requested chunk was cached
410        let chunk1_path = cache.get_chunk_path(&Path::from(file_path), 1);
411        assert!(
412            cache.is_chunk_ready(&chunk1_path),
413            "Chunk 1 should be ready"
414        );
415
416        // Other chunks should not be cached yet
417        let chunk0_path = cache.get_chunk_path(&Path::from(file_path), 0);
418        let chunk2_path = cache.get_chunk_path(&Path::from(file_path), 2);
419        assert!(
420            !cache.is_chunk_ready(&chunk0_path),
421            "Chunk 0 should not be ready yet"
422        );
423        assert!(
424            !cache.is_chunk_ready(&chunk2_path),
425            "Chunk 2 should not be ready yet"
426        );
427
428        Ok(())
429    }
430
431    // Test reading a range that spans multiple chunks
432    #[tokio::test]
433    async fn test_range_across_chunks() -> Result<()> {
434        let inner = Arc::new(InMemory::new());
435        let temp_dir = tempdir().unwrap();
436        let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
437
438        // Create a file larger than two chunks
439        let file_path = "multi_chunk_range.bin";
440        let file_size = CACHE_BLOCK_SIZE * 3; // 12MB
441        create_test_file(&inner, file_path, file_size).await?;
442
443        // Read a range that spans chunk 1 and chunk 2
444        let start = CACHE_BLOCK_SIZE - 1024;
445        let end = CACHE_BLOCK_SIZE * 2 + 1024;
446        verify_range(&cache, file_path, start..end).await?;
447
448        // Verify the chunks were cached
449        let chunk0_path = cache.get_chunk_path(&Path::from(file_path), 0);
450        let chunk1_path = cache.get_chunk_path(&Path::from(file_path), 1);
451        let chunk2_path = cache.get_chunk_path(&Path::from(file_path), 2);
452        assert!(
453            cache.is_chunk_ready(&chunk0_path),
454            "Chunk 0 should be ready"
455        );
456        assert!(
457            cache.is_chunk_ready(&chunk1_path),
458            "Chunk 1 should be ready"
459        );
460        assert!(
461            cache.is_chunk_ready(&chunk2_path),
462            "Chunk 2 should be ready"
463        );
464
465        Ok(())
466    }
467
468    // Test cache hit (read the same file twice)
469    #[tokio::test]
470    async fn test_cache_hit() -> Result<()> {
471        let inner = Arc::new(InMemory::new());
472        let temp_dir = tempdir().unwrap();
473        let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
474
475        // Create a file
476        let file_path = "cache_hit.bin";
477        let file_size = CACHE_BLOCK_SIZE + 1024; // Slightly more than one chunk
478        create_test_file(&inner, file_path, file_size).await?;
479
480        // Read the file to populate the cache
481        verify_range(&cache, file_path, 0..file_size).await?;
482
483        // Modify the original file in the inner store to verify we're reading from cache
484        let modified_data = vec![255u8; file_size as usize];
485        let path = Path::from(file_path);
486        inner.put(&path, Bytes::from(modified_data).into()).await?;
487
488        // Read the same range again - should get the original data from cache, not the modified data
489        verify_range(&cache, file_path, 0..file_size).await?;
490
491        Ok(())
492    }
493
494    // Test partial range requests
495    #[tokio::test]
496    async fn test_suffix_range() -> Result<()> {
497        let inner = Arc::new(InMemory::new());
498        let temp_dir = tempdir().unwrap();
499        let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
500
501        // Create a file
502        let file_path = "suffix_range.bin";
503        let file_size = CACHE_BLOCK_SIZE * 2; // 8MB
504        create_test_file(&inner, file_path, file_size).await?;
505
506        // Request the last 1MB of the file using GetRange::Suffix
507        let path = Path::from(file_path);
508        let options = GetOptions {
509            range: Some(GetRange::Suffix(1024 * 1024)),
510            ..Default::default()
511        };
512
513        let result = cache.get_opts(&path, options).await?;
514        let data = result.bytes().await?;
515
516        // Verify we got the right data size
517        assert_eq!(data.len(), 1024 * 1024);
518
519        // Verify the content matches expected pattern
520        let start = file_size - 1024 * 1024;
521        for (i, byte) in data.iter().enumerate() {
522            let expected = ((start + i as u64) % 256) as u8;
523            assert_eq!(*byte, expected, "Mismatch at position {i}");
524        }
525
526        Ok(())
527    }
528
529    #[tokio::test]
530    async fn test_persistent_cache() -> Result<()> {
531        // Create an InMemory store as the inner store
532        let inner = Arc::new(InMemory::new());
533        let temp_dir = tempdir().unwrap();
534        let cache_dir_path = temp_dir.path().to_path_buf();
535
536        // Create a file in the inner store
537        let file_path = "persistent_test.bin";
538        let file_size = CACHE_BLOCK_SIZE + 1024; // Slightly more than one chunk
539        create_test_file(&inner, file_path, file_size).await?;
540
541        // First cache instance - read data to populate cache
542        {
543            let first_cache = ByteCache::new(inner.clone(), cache_dir_path.clone());
544            verify_range(&first_cache, file_path, 0..file_size).await?;
545
546            // Verify data was cached
547            let chunk_path = first_cache.get_chunk_path(&Path::from(file_path), 0);
548            assert!(
549                first_cache.is_chunk_ready(&chunk_path),
550                "First chunk should be cached"
551            );
552        }
553
554        // Modify the data in the inner store to verify the second cache uses cached data
555        let modified_data = vec![255u8; file_size as usize];
556        let path = Path::from(file_path);
557        inner.put(&path, Bytes::from(modified_data).into()).await?;
558
559        // Create a new cache instance pointing to the same directory
560        let second_cache = ByteCache::new(inner.clone(), cache_dir_path);
561
562        // Read the data through the second cache - should get original data from cache
563        verify_range(&second_cache, file_path, 0..file_size).await?;
564
565        // Additional verification - check if a specific range is read correctly
566        let mid_range = file_size / 2;
567        verify_range(&second_cache, file_path, mid_range..mid_range + 1024).await?;
568
569        Ok(())
570    }
571
572    #[tokio::test]
573    async fn test_object_store_metrics() -> Result<()> {
574        let inner = Arc::new(MockStore::new_with_files(
575            1,
576            (CACHE_BLOCK_SIZE * 3) as usize,
577        ));
578        let temp_dir = tempdir().unwrap();
579        let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
580
581        let path = Path::from("0.parquet");
582        let start = CACHE_BLOCK_SIZE / 2;
583        let end = CACHE_BLOCK_SIZE + CACHE_BLOCK_SIZE / 2;
584
585        verify_range(&cache, path.as_ref(), start..end).await?;
586
587        let ranges = inner.get_access_ranges(&path).unwrap();
588        assert_eq!(ranges.len(), 2);
589        assert_eq!(ranges[0], 0..CACHE_BLOCK_SIZE);
590        assert_eq!(ranges[1], CACHE_BLOCK_SIZE..CACHE_BLOCK_SIZE * 2);
591
592        // second request should hit cache and not increase range count
593        verify_range(&cache, path.as_ref(), start + 1024..end - 1024).await?;
594        let ranges_after = inner.get_access_ranges(&path).unwrap();
595        assert_eq!(ranges_after.len(), 2);
596
597        Ok(())
598    }
599}