use crate::cached_object_store::stats::CachedObjectStoreStats;
use crate::cached_object_store::storage_fs::FsCacheStorage;
use crate::cached_object_store::LocalCacheEntry;
use crate::config::ObjectStoreCacheOptions;
use crate::rand::DbRand;
use bytes::{Bytes, BytesMut};
use futures::{future::BoxFuture, stream, stream::BoxStream, StreamExt};
use object_store::{path::Path, GetOptions, GetResult, ObjectMeta, ObjectStore};
use object_store::{Attributes, GetRange, GetResultPayload, PutMultipartOptions, PutResult};
use object_store::{ListResult, MultipartUpload, PutOptions, PutPayload};
use slatedb_common::clock::SystemClock;
use std::{ops::Range, sync::Arc};
use tokio::sync::OnceCell;
use crate::cached_object_store::admission::AdmissionPicker;
use crate::cached_object_store::storage::{LocalCacheStorage, PartID};
use crate::error::SlateDBError;
use log::warn;
use crate::utils::build_concurrent;
use slatedb_common::metrics::MetricsRecorderHelper;
#[derive(Debug, Clone)]
pub(crate) struct CachedObjectStore {
object_store: Arc<dyn ObjectStore>,
pub(crate) part_size_bytes: usize, pub(crate) cache_storage: Arc<dyn LocalCacheStorage>,
pub(crate) admission_picker: AdmissionPicker,
pub(crate) cache_puts: bool,
resolved_root: Arc<OnceCell<Path>>,
stats: Arc<CachedObjectStoreStats>,
}
impl CachedObjectStore {
pub(crate) fn new(
object_store: Arc<dyn ObjectStore>,
cache_storage: Arc<dyn LocalCacheStorage>,
part_size_bytes: usize,
cache_puts: bool,
stats: Arc<CachedObjectStoreStats>,
) -> Result<Arc<Self>, SlateDBError> {
if part_size_bytes == 0 || !part_size_bytes.is_multiple_of(1024) {
return Err(SlateDBError::InvalidCachePartSize);
}
Ok(Arc::new(Self {
object_store,
part_size_bytes,
cache_storage,
stats,
admission_picker: AdmissionPicker::default(),
cache_puts,
resolved_root: Arc::new(OnceCell::new()),
}))
}
pub(crate) async fn start_evictor(&self) {
self.cache_storage.start_evictor().await;
}
pub(crate) async fn from_config(
object_store: Arc<dyn ObjectStore>,
options: &ObjectStoreCacheOptions,
recorder: &MetricsRecorderHelper,
clock: Arc<dyn SystemClock>,
rand: Arc<DbRand>,
) -> Result<Option<Arc<Self>>, SlateDBError> {
let cache_root_folder = match &options.root_folder {
None => return Ok(None),
Some(f) => f,
};
let stats = Arc::new(CachedObjectStoreStats::new(recorder));
let cache_storage = Arc::new(FsCacheStorage::new(
cache_root_folder.clone(),
options.max_cache_size_bytes,
options.scan_interval,
stats.clone(),
clock,
rand,
));
let cached = Self::new(
object_store,
cache_storage,
options.part_size_bytes,
options.cache_puts,
stats,
)?;
cached.start_evictor().await;
Ok(Some(cached))
}
pub(crate) async fn load_files_to_cache(
&self,
file_paths: Vec<Path>,
max_bytes: usize,
) -> Result<(), SlateDBError> {
if file_paths.is_empty() || max_bytes == 0 {
return Ok(());
}
let mut remaining_bytes = max_bytes;
let mut files_to_load = Vec::with_capacity(file_paths.len());
for path in file_paths {
match self.object_store.head(&path).await {
Ok(meta) => {
let file_size = meta.size as usize;
if remaining_bytes >= file_size {
remaining_bytes -= file_size;
files_to_load.push(path);
} else {
break;
}
}
Err(e) => {
warn!("Failed to preload all SSTs to cache: {:?}", e);
break;
}
}
}
let degree_of_parallelism = 32;
let _result = build_concurrent(files_to_load.into_iter(), degree_of_parallelism, |path| {
let this = self.clone();
async move {
match this
.maybe_prefetch_range(&path, GetOptions::default())
.await
{
Ok(_) => Ok(Some(())),
Err(e) => {
warn!(
"Failed to prefetch file into cache [path={}, error={:?}]",
path, e
);
Ok(None) }
}
}
})
.await;
Ok(())
}
fn cache_location_for(&self, location: &Path) -> Option<Path> {
self.resolved_root.get().map(|root| {
if root.as_ref().is_empty() {
return location.clone();
}
root.parts().chain(location.parts()).collect()
})
}
fn resolve_root(&self, requested_location: &Path, meta_location: &Path) -> bool {
if self.resolved_root.get().is_none() {
let Some(root) = Self::infer_root(requested_location, meta_location) else {
warn!(
"failed to resolve cache root lazily [requested_location={}, meta_location={}]",
requested_location, meta_location,
);
return false;
};
let _ = self.resolved_root.set(root);
}
let Some(cache_location) = self.cache_location_for(requested_location) else {
warn!(
"cache location is unexpectedly unavailable after root resolution [requested_location={}, meta_location={}]",
requested_location, meta_location,
);
return false;
};
if &cache_location != meta_location {
warn!(
"resolved root mismatch [requested_location={}, cache_location={}, meta_location={}]",
requested_location, cache_location, meta_location,
);
return false;
}
true
}
fn infer_root(requested_location: &Path, meta_location: &Path) -> Option<Path> {
let requested_str = requested_location.as_ref();
let meta_str = meta_location.as_ref();
if requested_str.is_empty() {
return Some(meta_location.clone());
}
let prefix = meta_str.strip_suffix(requested_str)?;
if !prefix.is_empty() && !prefix.ends_with('/') {
return None;
}
Some(Path::from(prefix.trim_end_matches('/')))
}
pub(crate) async fn cached_head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
if let Some(cache_location) = self.cache_location_for(location) {
let entry = self
.cache_storage
.entry(&cache_location, self.part_size_bytes);
if let Ok(Some((meta, _))) = entry.read_head().await {
return Ok(meta);
}
}
let result = self
.object_store
.get_opts(
location,
GetOptions {
range: None,
head: true,
..Default::default()
},
)
.await?;
let meta = result.meta.clone();
if self.resolve_root(location, &meta.location) {
self.save_get_result(result).await.ok();
}
Ok(meta)
}
pub(crate) async fn cached_get_opts(
&self,
location: &Path,
opts: GetOptions,
) -> object_store::Result<GetResult> {
let (meta, attributes) = self.maybe_prefetch_range(location, opts.clone()).await?;
if self.cache_location_for(location).is_none() {
return self.object_store.get_opts(location, opts.clone()).await;
}
let get_range = opts.range.clone();
let range = self.canonicalize_range(get_range, meta.size)?;
let parts = self.split_range_into_parts(range.clone());
let futures = parts
.into_iter()
.map(|(part_id, range_in_part)| self.read_part(location, part_id, range_in_part))
.collect::<Vec<_>>();
let result_stream = stream::iter(futures).then(|fut| fut).boxed();
Ok(GetResult {
meta,
range,
attributes,
payload: GetResultPayload::Stream(result_stream),
})
}
async fn cached_put_opts(
&self,
location: &Path,
payload: object_store::PutPayload,
opts: object_store::PutOptions,
) -> object_store::Result<PutResult> {
if !self.cache_puts {
return self.object_store.put_opts(location, payload, opts).await;
}
let result = self
.object_store
.put_opts(location, payload.clone(), opts)
.await?;
let Some(cache_location) = self.cache_location_for(location) else {
return Ok(result);
};
let entry = self
.cache_storage
.entry(&cache_location, self.part_size_bytes);
if self.admission_picker.pick(entry.as_ref()).admitted() {
let stream = stream::iter(payload.into_iter()).map(Ok::<Bytes, object_store::Error>);
self.save_parts_stream(entry, stream, 0).await.ok();
}
Ok(result)
}
async fn maybe_prefetch_range(
&self,
location: &Path,
mut opts: GetOptions,
) -> object_store::Result<(ObjectMeta, Attributes)> {
if let Some(cache_location) = self.cache_location_for(location) {
let entry = self
.cache_storage
.entry(&cache_location, self.part_size_bytes);
match entry.read_head().await {
Ok(Some((meta, attrs))) => return Ok((meta, attrs)),
Ok(None) => {}
Err(e) => {
warn!(
"failed to read head from disk cache, will fallback to object store [location={}, error={:?}]",
location, e,
);
}
}
}
if let Some(range) = &opts.range {
opts.range = Some(self.align_get_range(range));
}
let get_result = self.object_store.get_opts(location, opts).await?;
let result_meta = get_result.meta.clone();
let result_attrs = get_result.attributes.clone();
if self.resolve_root(location, &result_meta.location) {
self.save_get_result(get_result).await.ok();
}
Ok((result_meta, result_attrs))
}
async fn save_get_result(&self, result: GetResult) -> object_store::Result<u64> {
let part_size_bytes_u64 = self.part_size_bytes as u64;
assert!(result.range.start.is_multiple_of(part_size_bytes_u64));
assert!(
result.range.end.is_multiple_of(part_size_bytes_u64)
|| result.range.end == result.meta.size
);
let entry = self
.cache_storage
.entry(&result.meta.location, self.part_size_bytes);
let object_size = result.meta.size;
if self.admission_picker.pick(entry.as_ref()).admitted() {
entry.save_head((&result.meta, &result.attributes)).await?;
let start_part_number = usize::try_from(result.range.start / part_size_bytes_u64)
.expect("Part number exceeds u32 on a 32-bit system. Try increasing part size.");
let stream = result.into_stream();
self.save_parts_stream(entry, stream, start_part_number)
.await?;
}
Ok(object_size)
}
async fn save_parts_stream<S>(
&self,
entry: Box<dyn LocalCacheEntry>,
mut stream: S,
start_part_number: usize,
) -> object_store::Result<usize>
where
S: stream::Stream<Item = Result<Bytes, object_store::Error>> + Unpin,
{
let mut buffer = BytesMut::new();
let mut part_number = start_part_number;
let mut total_bytes: usize = 0;
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
total_bytes += chunk.len();
buffer.extend_from_slice(&chunk);
while buffer.len() >= self.part_size_bytes {
let to_write = buffer.split_to(self.part_size_bytes);
entry.save_part(part_number, to_write.into()).await?;
part_number += 1;
}
}
if !buffer.is_empty() {
entry.save_part(part_number, buffer.into()).await?;
}
Ok(total_bytes)
}
fn split_range_into_parts(&self, range: Range<u64>) -> Vec<(PartID, Range<usize>)> {
let part_size_bytes_u64 = self.part_size_bytes as u64;
let range_aligned = self.align_range(&range, self.part_size_bytes);
let start_part = range_aligned.start / part_size_bytes_u64;
let end_part = range_aligned.end / part_size_bytes_u64;
let mut parts: Vec<_> = (start_part..end_part)
.map(|part_id| {
(
usize::try_from(part_id).expect("Number of parts exceeds usize"),
Range {
start: 0,
end: self.part_size_bytes,
},
)
})
.collect();
if parts.is_empty() {
return vec![];
}
if let Some(first_part) = parts.first_mut() {
first_part.1.start = usize::try_from(range.start % part_size_bytes_u64)
.expect("Part size is too large to fit in a usize");
}
if let Some(last_part) = parts.last_mut() {
if !range.end.is_multiple_of(part_size_bytes_u64) {
last_part.1.end = usize::try_from(range.end % part_size_bytes_u64)
.expect("Part size is too large to fit in a usize");
}
}
parts
}
fn read_part(
&self,
location: &Path,
part_id: PartID,
range_in_part: Range<usize>,
) -> BoxFuture<'static, object_store::Result<Bytes>> {
let this = self.clone();
let location = location.clone();
Box::pin(async move {
this.stats.object_store_cache_part_access.increment(1);
if let Some(cache_location) = this.cache_location_for(&location) {
let entry = this
.cache_storage
.entry(&cache_location, this.part_size_bytes);
if let Ok(Some(bytes)) = entry.read_part(part_id, range_in_part.clone()).await {
this.stats.object_store_cache_part_hits.increment(1);
return Ok(bytes);
}
}
let part_range = Range {
start: (part_id * this.part_size_bytes) as u64,
end: ((part_id + 1) * this.part_size_bytes) as u64,
};
let get_result = this
.object_store
.get_opts(
&location,
GetOptions {
range: Some(GetRange::Bounded(part_range)),
..Default::default()
},
)
.await?;
let cache_entry = if this.resolve_root(&location, &get_result.meta.location) {
this.cache_location_for(&location).map(|cache_location| {
this.cache_storage
.entry(&cache_location, this.part_size_bytes)
})
} else {
None
};
let bytes = if let Some(entry) = cache_entry {
let meta = get_result.meta.clone();
let attrs = get_result.attributes.clone();
let bytes = get_result.bytes().await?;
entry.save_head((&meta, &attrs)).await.ok();
entry.save_part(part_id, bytes.clone()).await.ok();
bytes
} else {
get_result.bytes().await?
};
Ok(Bytes::copy_from_slice(&bytes.slice(range_in_part)))
})
}
fn canonicalize_range(
&self,
range: Option<GetRange>,
object_size: u64,
) -> object_store::Result<Range<u64>> {
let (start_offset, end_offset) = match range {
None => (0, object_size),
Some(range) => match range {
GetRange::Bounded(range) => {
if range.start >= object_size {
return Err(object_store::Error::Generic {
store: "cached_object_store",
source: Box::new(InvalidGetRange::StartTooLarge {
requested: range.start,
length: object_size,
}),
});
}
if range.start >= range.end {
return Err(object_store::Error::Generic {
store: "cached_object_store",
source: Box::new(InvalidGetRange::Inconsistent {
start: range.start,
end: range.end,
}),
});
}
(range.start, range.end.min(object_size))
}
GetRange::Offset(offset) => {
if offset >= object_size {
return Err(object_store::Error::Generic {
store: "cached_object_store",
source: Box::new(InvalidGetRange::StartTooLarge {
requested: offset,
length: object_size,
}),
});
}
(offset, object_size)
}
GetRange::Suffix(suffix) => (object_size.saturating_sub(suffix), object_size),
},
};
Ok(Range {
start: start_offset,
end: end_offset,
})
}
fn align_get_range(&self, range: &GetRange) -> GetRange {
match range {
GetRange::Bounded(bounded) => {
let aligned = self.align_range(bounded, self.part_size_bytes);
GetRange::Bounded(aligned)
}
GetRange::Suffix(suffix) => {
let suffix_aligned = self.align_range(&(0..*suffix), self.part_size_bytes).end;
GetRange::Suffix(suffix_aligned)
}
GetRange::Offset(offset) => {
let offset_aligned = *offset - *offset % self.part_size_bytes as u64;
GetRange::Offset(offset_aligned)
}
}
}
fn align_range(&self, range: &Range<u64>, alignment: usize) -> Range<u64> {
let alignment = alignment as u64;
let start_aligned = range.start - range.start % alignment;
let end_aligned = range.end.div_ceil(alignment) * alignment;
Range {
start: start_aligned,
end: end_aligned,
}
}
}
impl std::fmt::Display for CachedObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"CachedObjectStore({}, {})",
self.object_store, self.cache_storage
)
}
}
#[async_trait::async_trait]
impl ObjectStore for CachedObjectStore {
async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
self.cached_get_opts(location, options).await
}
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
self.cached_head(location).await
}
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> object_store::Result<PutResult> {
self.cached_put_opts(location, payload, opts).await
}
async fn put_multipart(
&self,
location: &Path,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.object_store.put_multipart(location).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.object_store.put_multipart_opts(location, opts).await
}
async fn delete(&self, location: &Path) -> object_store::Result<()> {
self.object_store.delete(location).await
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
self.object_store.list(prefix)
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
self.object_store.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
self.object_store.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.object_store.copy(from, to).await
}
async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.object_store.rename(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.object_store.copy_if_not_exists(from, to).await
}
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.object_store.rename_if_not_exists(from, to).await
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum InvalidGetRange {
#[error("Range start too large, requested: {requested}, length: {length}")]
StartTooLarge { requested: u64, length: u64 },
#[error("Range started at {start} and ended at {end}")]
Inconsistent { start: u64, end: u64 },
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use bytes::Bytes;
use object_store::{path::Path, GetOptions, GetRange, ObjectStore, PutPayload};
use rand::Rng;
use super::CachedObjectStore;
use crate::cached_object_store::stats::CachedObjectStoreStats;
use crate::cached_object_store::storage::{LocalCacheStorage, PartID};
use crate::cached_object_store::storage_fs::FsCacheEntry;
use crate::cached_object_store::storage_fs::FsCacheStorage;
use crate::rand::DbRand;
use crate::test_utils::gen_rand_bytes;
use slatedb_common::clock::DefaultSystemClock;
use slatedb_common::metrics::MetricsRecorderHelper;
fn new_test_cache_folder() -> std::path::PathBuf {
let mut rng = rand::rng();
let dir_name: String = (0..10)
.map(|_| rng.sample(rand::distr::Alphanumeric) as char)
.collect();
let path = format!("/tmp/testcache-{}", dir_name);
let _ = std::fs::remove_dir_all(&path);
std::path::PathBuf::from(path)
}
#[derive(Debug)]
struct MismatchedMetaStore {
inner: Arc<dyn ObjectStore>,
bad_location: Path,
}
impl MismatchedMetaStore {
fn new(inner: Arc<dyn ObjectStore>, bad_location: Path) -> Self {
Self {
inner,
bad_location,
}
}
}
impl std::fmt::Display for MismatchedMetaStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MismatchedMetaStore({})", self.inner)
}
}
#[async_trait::async_trait]
impl ObjectStore for MismatchedMetaStore {
async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<object_store::GetResult> {
let mut result = self.inner.get_opts(location, options).await?;
result.meta.location = self.bad_location.clone();
Ok(result)
}
async fn head(&self, location: &Path) -> object_store::Result<object_store::ObjectMeta> {
self.inner.head(location).await
}
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: object_store::PutOptions,
) -> object_store::Result<object_store::PutResult> {
self.inner.put_opts(location, payload, opts).await
}
async fn put_multipart(
&self,
location: &Path,
) -> object_store::Result<Box<dyn object_store::MultipartUpload>> {
self.inner.put_multipart(location).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: object_store::PutMultipartOptions,
) -> object_store::Result<Box<dyn object_store::MultipartUpload>> {
self.inner.put_multipart_opts(location, opts).await
}
async fn delete(&self, location: &Path) -> object_store::Result<()> {
self.inner.delete(location).await
}
fn list(
&self,
prefix: Option<&Path>,
) -> futures::stream::BoxStream<'static, object_store::Result<object_store::ObjectMeta>>
{
self.inner.list(prefix)
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> futures::stream::BoxStream<'static, object_store::Result<object_store::ObjectMeta>>
{
self.inner.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(
&self,
prefix: Option<&Path>,
) -> object_store::Result<object_store::ListResult> {
self.inner.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy(from, to).await
}
async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.rename(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy_if_not_exists(from, to).await
}
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.rename_if_not_exists(from, to).await
}
}
#[test]
fn test_infer_root() {
assert_eq!(
CachedObjectStore::infer_root(
&Path::from("manifest/0001.manifest"),
&Path::from("tenant-a/manifest/0001.manifest")
),
Some(Path::from("tenant-a"))
);
assert_eq!(
CachedObjectStore::infer_root(
&Path::from("manifest/0001.manifest"),
&Path::from("other/path")
),
None
);
assert_eq!(
CachedObjectStore::infer_root(&Path::from("b/c"), &Path::from("ab/c")),
None
);
}
#[tokio::test]
async fn test_lazy_resolve_root_from_meta_location() -> object_store::Result<()> {
let backing_store: Arc<dyn ObjectStore> = Arc::new(object_store::memory::InMemory::new());
let recorder = MetricsRecorderHelper::noop();
let stats = Arc::new(CachedObjectStoreStats::new(&recorder));
let cache_storage = Arc::new(FsCacheStorage::new(
new_test_cache_folder(),
None,
None,
stats.clone(),
Arc::new(DefaultSystemClock::new()),
Arc::new(DbRand::default()),
));
let prefixed: Arc<dyn ObjectStore> = Arc::new(object_store::prefix::PrefixStore::new(
backing_store.clone(),
Path::from("tenant-a"),
));
let cached_store =
CachedObjectStore::new(prefixed, cache_storage, 1024, false, stats).unwrap();
let relative_location = Path::from("manifest/0001.manifest");
let full_location = Path::from("tenant-a/manifest/0001.manifest");
let payload = Bytes::from_static(b"tenant-a-manifest");
backing_store
.put(&full_location, PutPayload::from_bytes(payload.clone()))
.await?;
assert_eq!(cached_store.resolved_root.get().cloned(), None);
let got = cached_store
.cached_get_opts(&relative_location, GetOptions::default())
.await?
.bytes()
.await?;
assert_eq!(got, payload);
assert_eq!(
cached_store.resolved_root.get().cloned(),
Some(Path::from("tenant-a"))
);
let scoped_entry = cached_store.cache_storage.entry(&full_location, 1024);
assert_eq!(scoped_entry.cached_parts().await?.len(), 1);
let unscoped_entry = cached_store.cache_storage.entry(&relative_location, 1024);
assert_eq!(unscoped_entry.cached_parts().await?.len(), 0);
backing_store.delete(&full_location).await?;
let got_cached = cached_store
.cached_get_opts(&relative_location, GetOptions::default())
.await?
.bytes()
.await?;
assert_eq!(got_cached, payload);
Ok(())
}
#[tokio::test]
async fn test_shared_cache_with_prefix_stores_does_not_collide() -> object_store::Result<()> {
let backing_store: Arc<dyn ObjectStore> = Arc::new(object_store::memory::InMemory::new());
let cache_storage = Arc::new(FsCacheStorage::new(
new_test_cache_folder(),
None,
None,
{
let recorder = MetricsRecorderHelper::noop();
Arc::new(CachedObjectStoreStats::new(&recorder))
},
Arc::new(DefaultSystemClock::new()),
Arc::new(DbRand::default()),
));
let store_a: Arc<dyn ObjectStore> = Arc::new(object_store::prefix::PrefixStore::new(
backing_store.clone(),
Path::from("db-a"),
));
let store_b: Arc<dyn ObjectStore> = Arc::new(object_store::prefix::PrefixStore::new(
backing_store.clone(),
Path::from("db-b"),
));
let cached_a = CachedObjectStore::new(store_a, cache_storage.clone(), 1024, false, {
let recorder = MetricsRecorderHelper::noop();
Arc::new(CachedObjectStoreStats::new(&recorder))
})
.unwrap();
let cached_b = CachedObjectStore::new(store_b, cache_storage.clone(), 1024, false, {
let recorder = MetricsRecorderHelper::noop();
Arc::new(CachedObjectStoreStats::new(&recorder))
})
.unwrap();
let relative = Path::from("manifest/0001.manifest");
let full_a = Path::from("db-a/manifest/0001.manifest");
let full_b = Path::from("db-b/manifest/0001.manifest");
let payload_a = Bytes::from_static(b"tenant-a-data");
let payload_b = Bytes::from_static(b"tenant-b-data");
backing_store
.put(&full_a, PutPayload::from_bytes(payload_a.clone()))
.await?;
backing_store
.put(&full_b, PutPayload::from_bytes(payload_b.clone()))
.await?;
let got_a = cached_a
.cached_get_opts(&relative, GetOptions::default())
.await?
.bytes()
.await?;
let got_b = cached_b
.cached_get_opts(&relative, GetOptions::default())
.await?
.bytes()
.await?;
assert_eq!(got_a, payload_a);
assert_eq!(got_b, payload_b);
assert_eq!(
cached_a.resolved_root.get().cloned(),
Some(Path::from("db-a"))
);
assert_eq!(
cached_b.resolved_root.get().cloned(),
Some(Path::from("db-b"))
);
let unscoped_entry = cache_storage.entry(&relative, 1024);
assert_eq!(unscoped_entry.cached_parts().await?.len(), 0);
let scoped_a = cache_storage.entry(&full_a, 1024);
let scoped_b = cache_storage.entry(&full_b, 1024);
assert_eq!(scoped_a.cached_parts().await?.len(), 1);
assert_eq!(scoped_b.cached_parts().await?.len(), 1);
backing_store.delete(&full_a).await?;
backing_store.delete(&full_b).await?;
let got_cached_a = cached_a
.cached_get_opts(&relative, GetOptions::default())
.await?
.bytes()
.await?;
let got_cached_b = cached_b
.cached_get_opts(&relative, GetOptions::default())
.await?
.bytes()
.await?;
assert_eq!(got_cached_a, payload_a);
assert_eq!(got_cached_b, payload_b);
Ok(())
}
#[tokio::test]
async fn test_meta_location_mismatch_bypasses_cache() -> object_store::Result<()> {
let backing_store: Arc<dyn ObjectStore> = Arc::new(object_store::memory::InMemory::new());
let bad_meta_store: Arc<dyn ObjectStore> = Arc::new(MismatchedMetaStore::new(
backing_store.clone(),
Path::from("wrong/root"),
));
let recorder = MetricsRecorderHelper::noop();
let stats = Arc::new(CachedObjectStoreStats::new(&recorder));
let cache_storage = Arc::new(FsCacheStorage::new(
new_test_cache_folder(),
None,
None,
stats.clone(),
Arc::new(DefaultSystemClock::new()),
Arc::new(DbRand::default()),
));
let cached_store =
CachedObjectStore::new(bad_meta_store, cache_storage, 1024, false, stats).unwrap();
let location = Path::from("data/file.sst");
let payload = Bytes::from_static(b"payload");
backing_store
.put(&location, PutPayload::from_bytes(payload.clone()))
.await?;
assert_eq!(cached_store.resolved_root.get().cloned(), None);
let got = cached_store
.cached_get_opts(&location, GetOptions::default())
.await?
.bytes()
.await?;
assert_eq!(got, payload);
assert_eq!(cached_store.resolved_root.get().cloned(), None);
let entry = cached_store.cache_storage.entry(&location, 1024);
assert_eq!(entry.cached_parts().await?.len(), 0);
Ok(())
}
#[tokio::test]
async fn test_save_result_not_aligned() -> object_store::Result<()> {
let payload = gen_rand_bytes(1024 * 3 + 32);
let object_store = Arc::new(object_store::memory::InMemory::new());
let test_cache_folder = new_test_cache_folder();
let recorder = MetricsRecorderHelper::noop();
let stats = Arc::new(CachedObjectStoreStats::new(&recorder));
object_store
.put(
&Path::from("/data/testfile1"),
PutPayload::from_bytes(payload.clone()),
)
.await?;
let location = Path::from("/data/testfile1");
let get_result = object_store.get(&location).await?;
let cache_storage = Arc::new(FsCacheStorage::new(
test_cache_folder.clone(),
None,
None,
stats.clone(),
Arc::new(DefaultSystemClock::new()),
Arc::new(DbRand::default()),
));
let part_size = 1024;
let cached_store =
CachedObjectStore::new(object_store.clone(), cache_storage, part_size, false, stats)
.unwrap();
let entry = cached_store.cache_storage.entry(&location, 1024);
let object_size_hint = cached_store.save_get_result(get_result).await?;
assert_eq!(object_size_hint, 1024 * 3 + 32);
let head = entry.read_head().await?;
assert_eq!(head.unwrap().0.size, 1024 * 3 + 32);
let cached_parts = entry.cached_parts().await?;
assert_eq!(cached_parts.len(), 4);
assert_eq!(
entry.read_part(0, 0..part_size).await?,
Some(payload.slice(0..1024))
);
assert_eq!(
entry.read_part(1, 0..part_size).await?,
Some(payload.slice(1024..2048))
);
assert_eq!(
entry.read_part(2, 0..part_size).await?,
Some(payload.slice(2048..3072))
);
assert_eq!(
entry.read_part(3, 0..32).await?,
Some(payload.slice(3072..3104))
);
let evict_part_path =
FsCacheEntry::make_part_path(test_cache_folder.clone(), &location, 2, 1024);
std::fs::remove_file(evict_part_path).unwrap();
assert_eq!(entry.read_part(2, 0..part_size).await?, None);
let cached_parts = entry.cached_parts().await?;
assert_eq!(cached_parts, vec![0, 1, 3]);
let evict_part_path =
FsCacheEntry::make_part_path(test_cache_folder.clone(), &location, 3, 1024);
std::fs::remove_file(evict_part_path).unwrap();
assert_eq!(entry.read_part(3, 0..part_size).await?, None);
let cached_parts = entry.cached_parts().await?;
assert_eq!(cached_parts, vec![0, 1]);
Ok(())
}
#[tokio::test]
async fn test_save_result_aligned() -> object_store::Result<()> {
let payload = gen_rand_bytes(1024 * 3);
let object_store = Arc::new(object_store::memory::InMemory::new());
let test_cache_folder = new_test_cache_folder();
let recorder = MetricsRecorderHelper::noop();
let stats = Arc::new(CachedObjectStoreStats::new(&recorder));
object_store
.put(
&Path::from("/data/testfile1"),
PutPayload::from_bytes(payload.clone()),
)
.await?;
let location = Path::from("/data/testfile1");
let get_result = object_store.get(&location).await?;
let part_size = 1024;
let cache_storage = Arc::new(FsCacheStorage::new(
test_cache_folder.clone(),
None,
None,
stats.clone(),
Arc::new(DefaultSystemClock::new()),
Arc::new(DbRand::default()),
));
let cached_store =
CachedObjectStore::new(object_store, cache_storage, part_size, false, stats).unwrap();
let entry = cached_store.cache_storage.entry(&location, part_size);
let object_size_hint = cached_store.save_get_result(get_result).await?;
assert_eq!(object_size_hint, 1024 * 3);
let cached_parts = entry.cached_parts().await?;
assert_eq!(cached_parts.len(), 3);
assert_eq!(
entry.read_part(0, 0..part_size).await?,
Some(payload.slice(0..1024))
);
assert_eq!(
entry.read_part(1, 0..part_size).await?,
Some(payload.slice(1024..2048))
);
assert_eq!(
entry.read_part(2, 0..part_size).await?,
Some(payload.slice(2048..3072))
);
let evict_part_path =
FsCacheEntry::make_part_path(test_cache_folder.clone(), &location, 2, part_size);
std::fs::remove_file(evict_part_path).unwrap();
assert_eq!(entry.read_part(2, 0..part_size).await?, None);
let cached_parts = entry.cached_parts().await?;
assert_eq!(cached_parts.len(), 2);
Ok(())
}
#[test]
fn test_split_range_into_parts() {
let object_store = Arc::new(object_store::memory::InMemory::new());
let test_cache_folder = new_test_cache_folder();
let recorder = MetricsRecorderHelper::noop();
let stats = Arc::new(CachedObjectStoreStats::new(&recorder));
let cache_storage = Arc::new(FsCacheStorage::new(
test_cache_folder,
None,
None,
stats.clone(),
Arc::new(DefaultSystemClock::new()),
Arc::new(DbRand::default()),
));
let cached_store =
CachedObjectStore::new(object_store, cache_storage, 1024, false, stats).unwrap();
struct Test {
input: (Option<GetRange>, usize),
expect: Vec<(PartID, std::ops::Range<usize>)>,
}
let tests = [
Test {
input: (None, 1024 * 3),
expect: vec![(0, 0..1024), (1, 0..1024), (2, 0..1024)],
},
Test {
input: (None, 1024 * 3 + 12),
expect: vec![(0, 0..1024), (1, 0..1024), (2, 0..1024), (3, 0..12)],
},
Test {
input: (None, 12),
expect: vec![(0, 0..12)],
},
Test {
input: (Some(GetRange::Bounded(0..1024)), 1024),
expect: vec![(0, 0..1024)],
},
Test {
input: (Some(GetRange::Bounded(128..1024)), 20000),
expect: vec![(0, 128..1024)],
},
Test {
input: (Some(GetRange::Bounded(128..1024 + 12)), 20000),
expect: vec![(0, 128..1024), (1, 0..12)],
},
Test {
input: (Some(GetRange::Bounded(128..1024 * 2 + 12)), 20000),
expect: vec![(0, 128..1024), (1, 0..1024), (2, 0..12)],
},
Test {
input: (Some(GetRange::Bounded(1024 * 2..1024 * 3 + 12)), 200000),
expect: vec![(2, 0..1024), (3, 0..12)],
},
Test {
input: (Some(GetRange::Bounded(1024 * 2 - 2..1024 * 3 + 12)), 20000),
expect: vec![(1, 1022..1024), (2, 0..1024), (3, 0..12)],
},
Test {
input: (Some(GetRange::Suffix(128)), 1024),
expect: vec![(0, 896..1024)],
},
Test {
input: (Some(GetRange::Suffix(1024 * 2 + 8)), 1024 * 4),
expect: vec![(1, 1016..1024), (2, 0..1024), (3, 0..1024)],
},
Test {
input: (Some(GetRange::Offset(8)), 1024 * 4),
expect: vec![(0, 8..1024), (1, 0..1024), (2, 0..1024), (3, 0..1024)],
},
Test {
input: (Some(GetRange::Offset(1024 * 2 + 8)), 1024 * 4),
expect: vec![(2, 8..1024), (3, 0..1024)],
},
Test {
input: (Some(GetRange::Offset(1024 * 2 + 8)), 1024 * 4 + 2),
expect: vec![(2, 8..1024), (3, 0..1024), (4, 0..2)],
},
];
for t in tests.iter() {
let range = cached_store
.canonicalize_range(t.input.0.clone(), t.input.1 as u64)
.unwrap();
let parts = cached_store.split_range_into_parts(range);
assert_eq!(parts, t.expect, "input: {:?}", t.input);
}
}
#[test]
fn test_align_range() {
let object_store = Arc::new(object_store::memory::InMemory::new());
let test_cache_folder = new_test_cache_folder();
let recorder = MetricsRecorderHelper::noop();
let stats = Arc::new(CachedObjectStoreStats::new(&recorder));
let cache_storage = Arc::new(FsCacheStorage::new(
test_cache_folder,
None,
None,
stats.clone(),
Arc::new(DefaultSystemClock::new()),
Arc::new(DbRand::default()),
));
let cached_store =
CachedObjectStore::new(object_store, cache_storage, 1024, false, stats).unwrap();
let aligned = cached_store.align_range(&(9..1025), 1024);
assert_eq!(aligned, 0..2048);
let aligned = cached_store.align_range(&(1024 + 1..2048 + 4), 1024);
assert_eq!(aligned, 1024..3072);
}
#[test]
fn test_align_get_range() {
let object_store = Arc::new(object_store::memory::InMemory::new());
let test_cache_folder = new_test_cache_folder();
let recorder = MetricsRecorderHelper::noop();
let stats = Arc::new(CachedObjectStoreStats::new(&recorder));
let cache_storage = Arc::new(FsCacheStorage::new(
test_cache_folder,
None,
None,
stats.clone(),
Arc::new(DefaultSystemClock::new()),
Arc::new(DbRand::default()),
));
let cached_store =
CachedObjectStore::new(object_store, cache_storage, 1024, false, stats).unwrap();
let aligned = cached_store.align_get_range(&GetRange::Bounded(9..1025));
assert_eq!(aligned, GetRange::Bounded(0..2048));
let aligned = cached_store.align_get_range(&GetRange::Bounded(9..2048));
assert_eq!(aligned, GetRange::Bounded(0..2048));
let aligned = cached_store.align_get_range(&GetRange::Suffix(12));
assert_eq!(aligned, GetRange::Suffix(1024));
let aligned = cached_store.align_get_range(&GetRange::Suffix(1024));
assert_eq!(aligned, GetRange::Suffix(1024));
let aligned = cached_store.align_get_range(&GetRange::Offset(1024));
assert_eq!(aligned, GetRange::Offset(1024));
let aligned = cached_store.align_get_range(&GetRange::Offset(12));
assert_eq!(aligned, GetRange::Offset(0));
}
#[tokio::test]
async fn test_cached_object_store_impl_object_store() -> object_store::Result<()> {
let object_store = Arc::new(object_store::memory::InMemory::new());
let test_cache_folder = new_test_cache_folder();
let recorder = MetricsRecorderHelper::noop();
let stats = Arc::new(CachedObjectStoreStats::new(&recorder));
let cache_storage = Arc::new(FsCacheStorage::new(
test_cache_folder.clone(),
None,
None,
stats.clone(),
Arc::new(DefaultSystemClock::new()),
Arc::new(DbRand::default()),
));
let cached_store =
CachedObjectStore::new(object_store.clone(), cache_storage, 1024, false, stats)
.unwrap();
let test_path = Path::from("/data/testdata1");
let test_payload = gen_rand_bytes(1024 * 3 + 2);
object_store
.put(&test_path, PutPayload::from_bytes(test_payload.clone()))
.await?;
let test_ranges = vec![
Some(GetRange::Offset(260817)),
None,
Some(GetRange::Bounded(1000..2048)),
Some(GetRange::Bounded(1000..260817)),
Some(GetRange::Suffix(10)),
Some(GetRange::Suffix(260817)),
Some(GetRange::Offset(1000)),
Some(GetRange::Offset(0)),
Some(GetRange::Offset(1028)),
Some(GetRange::Offset(260817)),
Some(GetRange::Offset(1024 * 3 + 2)),
Some(GetRange::Offset(1024 * 3 + 1)),
#[allow(clippy::reversed_empty_ranges)]
Some(GetRange::Bounded(2900..2048)),
Some(GetRange::Bounded(10..10)),
];
for range in test_ranges.iter() {
let want = object_store
.get_opts(
&test_path,
GetOptions {
range: range.clone(),
..Default::default()
},
)
.await;
let got = cached_store
.cached_get_opts(
&test_path,
GetOptions {
range: range.clone(),
..Default::default()
},
)
.await;
match (want, got) {
(Ok(want), Ok(got)) => {
assert_eq!(want.range, got.range);
assert_eq!(want.meta, got.meta);
assert_eq!(want.bytes().await?, got.bytes().await?);
}
(Err(want), Err(got)) => {
if want.to_string().to_lowercase().contains("range") {
assert!(got.to_string().to_lowercase().contains("range"));
}
}
(origin_result, cached_result) => {
panic!("expect: {:?}, got: {:?}", origin_result, cached_result);
}
}
}
Ok(())
}
#[tokio::test]
async fn test_preload_cache() {
let cache_dir = new_test_cache_folder();
let recorder = MetricsRecorderHelper::noop();
let stats = Arc::new(CachedObjectStoreStats::new(&recorder));
let cache_storage = Arc::new(FsCacheStorage::new(
cache_dir,
Some(10 * 1024 * 1024), None,
stats.clone(),
Arc::new(DefaultSystemClock::new()),
Arc::new(DbRand::default()),
));
let object_store = Arc::new(object_store::memory::InMemory::new());
let cached_store =
CachedObjectStore::new(object_store.clone(), cache_storage, 1024, true, stats).unwrap();
let test_paths = vec![
Path::from("file1.sst"),
Path::from("file2.sst"),
Path::from("file3.sst"),
];
let test_data = gen_rand_bytes(2048);
for path in &test_paths {
object_store
.put(path, PutPayload::from_bytes(test_data.clone()))
.await
.unwrap();
}
cached_store
.load_files_to_cache(test_paths.clone(), 10 * 1024) .await
.unwrap();
for path in &test_paths {
let entry = cached_store.cache_storage.entry(path, 1024);
let cached_parts = entry.cached_parts().await.unwrap();
assert_eq!(cached_parts.len(), 2); }
}
#[tokio::test]
async fn test_preload_cache_above_limit() {
let cache_dir = new_test_cache_folder();
let recorder = MetricsRecorderHelper::noop();
let stats = Arc::new(CachedObjectStoreStats::new(&recorder));
let cache_storage = Arc::new(FsCacheStorage::new(
cache_dir,
Some(10 * 1024 * 1024), None,
stats.clone(),
Arc::new(DefaultSystemClock::new()),
Arc::new(DbRand::default()),
));
let object_store = Arc::new(object_store::memory::InMemory::new());
let cached_store =
CachedObjectStore::new(object_store.clone(), cache_storage, 1024, true, stats).unwrap();
let test_paths = vec![Path::from("file1.sst"), Path::from("file2.sst")];
let test_data = gen_rand_bytes(2048);
for path in &test_paths {
object_store
.put(path, PutPayload::from_bytes(test_data.clone()))
.await
.unwrap();
}
cached_store
.load_files_to_cache(test_paths.clone(), 0)
.await
.unwrap();
for path in &test_paths {
let entry = cached_store.cache_storage.entry(path, 1024);
let cached_parts = entry.cached_parts().await.unwrap();
assert_eq!(cached_parts.len(), 0); }
}
}