use std::{
fmt::{Display, Formatter},
fs,
ops::{Range, RangeInclusive},
path::PathBuf,
sync::Arc,
};
use async_stream::stream;
use async_trait::async_trait;
use bytes::Bytes;
use futures::{Stream, stream::BoxStream};
use object_store::{
Error, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
path::Path,
};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
const CACHE_BLOCK_SIZE: u64 = 1024 * 1024 * 4;
#[derive(Debug, Clone)]
pub struct ByteCache {
inner: Arc<dyn ObjectStore>,
cache_dir: PathBuf,
}
impl ByteCache {
pub fn new(inner: Arc<dyn ObjectStore>, cache_dir: PathBuf) -> Self {
if !cache_dir.exists() {
fs::create_dir_all(&cache_dir).expect("Failed to create cache directory");
}
Self { inner, cache_dir }
}
fn get_cache_dir_for_path(&self, path: &Path) -> PathBuf {
let path_str = path.as_ref().replace("/", "_");
self.cache_dir.join(path_str)
}
fn get_chunk_path(&self, path: &Path, chunk_index: u64) -> PathBuf {
let cache_dir = self.get_cache_dir_for_path(path);
cache_dir.join(format!("chunk_{chunk_index}.bin"))
}
fn get_temp_chunk_path(&self, path: &Path, chunk_index: u64) -> PathBuf {
let cache_dir = self.get_cache_dir_for_path(path);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let thread_id = std::thread::current().id();
cache_dir.join(format!(
"in-progress-chunk_{chunk_index}_{now}_{thread_id:?}.bin"
))
}
fn chunks_for_range(&self, range: &Range<u64>) -> RangeInclusive<u64> {
let start_chunk = range.start / CACHE_BLOCK_SIZE;
let end_chunk = (range.end - 1) / CACHE_BLOCK_SIZE; start_chunk..=end_chunk
}
fn is_chunk_ready(&self, chunk_path: &std::path::Path) -> bool {
chunk_path.exists()
}
async fn read_from_cached_chunk(
&self,
chunk_path: PathBuf,
offset: u64,
len: usize,
) -> Result<Bytes> {
let mut file = tokio::fs::File::open(&chunk_path)
.await
.map_err(|e| Error::Generic {
store: "ByteCache",
source: Box::new(e),
})?;
let mut buffer = vec![0u8; len];
file.seek(tokio::io::SeekFrom::Start(offset))
.await
.map_err(|e| Error::Generic {
store: "ByteCache",
source: Box::new(e),
})?;
file.read_exact(&mut buffer)
.await
.map_err(|e| Error::Generic {
store: "ByteCache",
source: Box::new(e),
})?;
Ok(Bytes::from(buffer))
}
async fn save_chunk(&self, path: &Path, chunk_index: u64, data: Bytes) -> Result<()> {
let cache_dir = self.get_cache_dir_for_path(path);
if !cache_dir.exists() {
fs::create_dir_all(&cache_dir).map_err(|e| Error::Generic {
store: "ByteCache",
source: Box::new(e),
})?;
}
let temp_chunk_path = self.get_temp_chunk_path(path, chunk_index);
let final_chunk_path = self.get_chunk_path(path, chunk_index);
let mut file = tokio::fs::File::create(&temp_chunk_path)
.await
.map_err(|e| Error::Generic {
store: "ByteCache",
source: Box::new(e),
})?;
file.write_all(&data).await.map_err(|e| Error::Generic {
store: "ByteCache",
source: Box::new(e),
})?;
file.sync_all().await.map_err(|e| Error::Generic {
store: "ByteCache",
source: Box::new(e),
})?;
drop(file);
match tokio::fs::rename(&temp_chunk_path, &final_chunk_path).await {
Ok(_) => Ok(()),
Err(e) => {
let _ = tokio::fs::remove_file(&temp_chunk_path).await;
if final_chunk_path.exists() {
Ok(())
} else {
Err(Error::Generic {
store: "ByteCache",
source: Box::new(e),
})
}
}
}
}
fn get_range_from_cache_stream(
&self,
location: &Path,
range: &Range<u64>,
) -> impl Stream<Item = Result<Bytes>> + Send + 'static {
let this = self.clone();
let location = location.clone();
let range = range.clone();
stream! {
let chunks_needed = this.chunks_for_range(&range);
for chunk_idx in chunks_needed {
let chunk_path = this.get_chunk_path(&location, chunk_idx);
let chunk_start = chunk_idx * CACHE_BLOCK_SIZE;
let chunk_end = chunk_start + CACHE_BLOCK_SIZE;
let overlap_start = std::cmp::max(chunk_start, range.start);
let overlap_end = std::cmp::min(chunk_end, range.end);
if overlap_start < overlap_end {
let offset_in_chunk = overlap_start - chunk_start;
let length = overlap_end - overlap_start;
yield this
.read_from_cached_chunk(chunk_path, offset_in_chunk, length as usize)
.await;
}
}
}
}
}
impl Display for ByteCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ByteCache(cache_dir: {:?})", self.cache_dir)
}
}
#[async_trait]
impl ObjectStore for ByteCache {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.inner.list(prefix)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let meta = self.inner.head(location).await?;
let file_size = meta.size;
if options.head {
return self.inner.get_opts(location, options).await;
}
let range = match &options.range {
Some(GetRange::Bounded(range)) => range.clone(),
Some(GetRange::Suffix(suffix)) => (file_size.saturating_sub(*suffix))..file_size,
Some(GetRange::Offset(offset)) => *offset..file_size,
None => 0..file_size,
};
let chunks_needed = self.chunks_for_range(&range);
let mut missing_chunks = Vec::new();
for chunk_idx in chunks_needed {
let chunk_path = self.get_chunk_path(location, chunk_idx);
if !self.is_chunk_ready(&chunk_path) {
missing_chunks.push(chunk_idx);
}
}
for chunk_idx in missing_chunks {
let chunk_start = chunk_idx * CACHE_BLOCK_SIZE;
let chunk_end = std::cmp::min(chunk_start + CACHE_BLOCK_SIZE, file_size);
let chunk_range = GetRange::Bounded(chunk_start..chunk_end);
let chunk_options = GetOptions {
range: Some(chunk_range),
..options.clone()
};
let chunk_result = self.inner.get_opts(location, chunk_options).await?;
let chunk_data = chunk_result.bytes().await?;
self.save_chunk(location, chunk_idx, chunk_data).await?;
}
Ok(GetResult {
payload: GetResultPayload::Stream(Box::pin(
self.get_range_from_cache_stream(location, &range),
)),
meta,
range,
attributes: Default::default(),
})
}
async fn put_opts(
&self,
_location: &Path,
_payload: PutPayload,
_opts: PutOptions,
) -> Result<PutResult> {
unreachable!("ByteCache does not support put")
}
async fn put_multipart_opts(
&self,
_location: &Path,
_opts: PutMultipartOptions,
) -> Result<Box<dyn MultipartUpload>> {
unreachable!("ByteCache does not support multipart upload")
}
async fn delete(&self, _location: &Path) -> Result<()> {
unreachable!("ByteCache does not support delete")
}
async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
unreachable!("ByteCache does not support copy")
}
async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
unreachable!("ByteCache does not support copy_if_not_exists")
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use liquid_cache_common::mock_store::MockStore;
use object_store::memory::InMemory;
use std::ops::Range;
use tempfile::tempdir;
async fn create_test_file(store: &InMemory, path: &str, size: u64) -> Result<()> {
let data = vec![0u8; size as usize];
let data: Vec<u8> = data
.iter()
.enumerate()
.map(|(i, _)| (i % 256) as u8)
.collect();
let path = Path::from(path);
store.put(&path, Bytes::from(data).into()).await?;
Ok(())
}
async fn verify_range(store: &dyn ObjectStore, path: &str, range: Range<u64>) -> Result<()> {
let path = Path::from(path);
let result = store.get_range(&path, range.clone()).await?;
for (i, byte) in result.iter().enumerate() {
let expected = ((range.start + i as u64) % 256) as u8;
assert_eq!(*byte, expected, "Mismatch at position {i}");
}
Ok(())
}
#[tokio::test]
async fn test_small_file() -> Result<()> {
let inner = Arc::new(InMemory::new());
let temp_dir = tempdir().unwrap();
let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
let file_path = "small_file.bin";
create_test_file(&inner, file_path, 10 * 1024).await?;
verify_range(&cache, file_path, 0..10 * 1024).await?;
let cache_dir = cache.get_cache_dir_for_path(&Path::from(file_path));
assert!(cache_dir.exists(), "Cache directory should exist");
let chunk_path = cache.get_chunk_path(&Path::from(file_path), 0);
assert!(
cache.is_chunk_ready(&chunk_path),
"Chunk file should be ready"
);
Ok(())
}
#[tokio::test]
async fn test_large_file() -> Result<()> {
let inner = Arc::new(InMemory::new());
let temp_dir = tempdir().unwrap();
let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
let file_path = "large_file.bin";
let file_size = CACHE_BLOCK_SIZE * 2 + 1024 * 1024; create_test_file(&inner, file_path, file_size).await?;
verify_range(&cache, file_path, 0..file_size).await?;
for chunk_idx in 0..=2 {
let chunk_path = cache.get_chunk_path(&Path::from(file_path), chunk_idx);
assert!(
cache.is_chunk_ready(&chunk_path),
"Chunk {chunk_idx} should be ready"
);
}
Ok(())
}
#[tokio::test]
async fn test_range_within_chunk() -> Result<()> {
let inner = Arc::new(InMemory::new());
let temp_dir = tempdir().unwrap();
let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
let file_path = "range_test.bin";
let file_size = CACHE_BLOCK_SIZE * 3; create_test_file(&inner, file_path, file_size).await?;
let start = CACHE_BLOCK_SIZE + 1024;
let end = CACHE_BLOCK_SIZE + 2048;
verify_range(&cache, file_path, start..end).await?;
let chunk1_path = cache.get_chunk_path(&Path::from(file_path), 1);
assert!(
cache.is_chunk_ready(&chunk1_path),
"Chunk 1 should be ready"
);
let chunk0_path = cache.get_chunk_path(&Path::from(file_path), 0);
let chunk2_path = cache.get_chunk_path(&Path::from(file_path), 2);
assert!(
!cache.is_chunk_ready(&chunk0_path),
"Chunk 0 should not be ready yet"
);
assert!(
!cache.is_chunk_ready(&chunk2_path),
"Chunk 2 should not be ready yet"
);
Ok(())
}
#[tokio::test]
async fn test_range_across_chunks() -> Result<()> {
let inner = Arc::new(InMemory::new());
let temp_dir = tempdir().unwrap();
let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
let file_path = "multi_chunk_range.bin";
let file_size = CACHE_BLOCK_SIZE * 3; create_test_file(&inner, file_path, file_size).await?;
let start = CACHE_BLOCK_SIZE - 1024;
let end = CACHE_BLOCK_SIZE * 2 + 1024;
verify_range(&cache, file_path, start..end).await?;
let chunk0_path = cache.get_chunk_path(&Path::from(file_path), 0);
let chunk1_path = cache.get_chunk_path(&Path::from(file_path), 1);
let chunk2_path = cache.get_chunk_path(&Path::from(file_path), 2);
assert!(
cache.is_chunk_ready(&chunk0_path),
"Chunk 0 should be ready"
);
assert!(
cache.is_chunk_ready(&chunk1_path),
"Chunk 1 should be ready"
);
assert!(
cache.is_chunk_ready(&chunk2_path),
"Chunk 2 should be ready"
);
Ok(())
}
#[tokio::test]
async fn test_cache_hit() -> Result<()> {
let inner = Arc::new(InMemory::new());
let temp_dir = tempdir().unwrap();
let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
let file_path = "cache_hit.bin";
let file_size = CACHE_BLOCK_SIZE + 1024; create_test_file(&inner, file_path, file_size).await?;
verify_range(&cache, file_path, 0..file_size).await?;
let modified_data = vec![255u8; file_size as usize];
let path = Path::from(file_path);
inner.put(&path, Bytes::from(modified_data).into()).await?;
verify_range(&cache, file_path, 0..file_size).await?;
Ok(())
}
#[tokio::test]
async fn test_suffix_range() -> Result<()> {
let inner = Arc::new(InMemory::new());
let temp_dir = tempdir().unwrap();
let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
let file_path = "suffix_range.bin";
let file_size = CACHE_BLOCK_SIZE * 2; create_test_file(&inner, file_path, file_size).await?;
let path = Path::from(file_path);
let options = GetOptions {
range: Some(GetRange::Suffix(1024 * 1024)),
..Default::default()
};
let result = cache.get_opts(&path, options).await?;
let data = result.bytes().await?;
assert_eq!(data.len(), 1024 * 1024);
let start = file_size - 1024 * 1024;
for (i, byte) in data.iter().enumerate() {
let expected = ((start + i as u64) % 256) as u8;
assert_eq!(*byte, expected, "Mismatch at position {i}");
}
Ok(())
}
#[tokio::test]
async fn test_persistent_cache() -> Result<()> {
let inner = Arc::new(InMemory::new());
let temp_dir = tempdir().unwrap();
let cache_dir_path = temp_dir.path().to_path_buf();
let file_path = "persistent_test.bin";
let file_size = CACHE_BLOCK_SIZE + 1024; create_test_file(&inner, file_path, file_size).await?;
{
let first_cache = ByteCache::new(inner.clone(), cache_dir_path.clone());
verify_range(&first_cache, file_path, 0..file_size).await?;
let chunk_path = first_cache.get_chunk_path(&Path::from(file_path), 0);
assert!(
first_cache.is_chunk_ready(&chunk_path),
"First chunk should be cached"
);
}
let modified_data = vec![255u8; file_size as usize];
let path = Path::from(file_path);
inner.put(&path, Bytes::from(modified_data).into()).await?;
let second_cache = ByteCache::new(inner.clone(), cache_dir_path);
verify_range(&second_cache, file_path, 0..file_size).await?;
let mid_range = file_size / 2;
verify_range(&second_cache, file_path, mid_range..mid_range + 1024).await?;
Ok(())
}
#[tokio::test]
async fn test_object_store_metrics() -> Result<()> {
let inner = Arc::new(MockStore::new_with_files(
1,
(CACHE_BLOCK_SIZE * 3) as usize,
));
let temp_dir = tempdir().unwrap();
let cache = ByteCache::new(inner.clone(), temp_dir.path().to_path_buf());
let path = Path::from("0.parquet");
let start = CACHE_BLOCK_SIZE / 2;
let end = CACHE_BLOCK_SIZE + CACHE_BLOCK_SIZE / 2;
verify_range(&cache, path.as_ref(), start..end).await?;
let ranges = inner.get_access_ranges(&path).unwrap();
assert_eq!(ranges.len(), 2);
assert_eq!(ranges[0], 0..CACHE_BLOCK_SIZE);
assert_eq!(ranges[1], CACHE_BLOCK_SIZE..CACHE_BLOCK_SIZE * 2);
verify_range(&cache, path.as_ref(), start + 1024..end - 1024).await?;
let ranges_after = inner.get_access_ranges(&path).unwrap();
assert_eq!(ranges_after.len(), 2);
Ok(())
}
}