use crate::cached_object_store::stats::CachedObjectStoreStats;
use bytes::{Bytes, BytesMut};
use futures::{future::BoxFuture, stream, stream::BoxStream, StreamExt};
use object_store::{path::Path, GetOptions, GetResult, ObjectMeta, ObjectStore};
use object_store::{Attributes, GetRange, GetResultPayload, PutResult};
use object_store::{ListResult, MultipartUpload, PutMultipartOpts, PutOptions, PutPayload};
use std::{ops::Range, sync::Arc};
use crate::cached_object_store::storage::{LocalCacheStorage, PartID};
use crate::error::SlateDBError;
#[derive(Debug, Clone)]
pub(crate) struct CachedObjectStore {
object_store: Arc<dyn ObjectStore>,
pub(crate) part_size_bytes: usize, pub(crate) cache_storage: Arc<dyn LocalCacheStorage>,
stats: Arc<CachedObjectStoreStats>,
}
impl CachedObjectStore {
pub fn new(
object_store: Arc<dyn ObjectStore>,
cache_storage: Arc<dyn LocalCacheStorage>,
part_size_bytes: usize,
stats: Arc<CachedObjectStoreStats>,
) -> Result<Arc<Self>, SlateDBError> {
if part_size_bytes == 0 || part_size_bytes % 1024 != 0 {
return Err(SlateDBError::InvalidCachePartSize);
}
Ok(Arc::new(Self {
object_store,
part_size_bytes,
cache_storage,
stats,
}))
}
pub async fn start_evictor(&self) {
self.cache_storage.start_evictor().await;
}
pub async fn cached_head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
let entry = self.cache_storage.entry(location, self.part_size_bytes);
match entry.read_head().await {
Ok(Some((meta, _))) => Ok(meta),
_ => {
let result = self
.object_store
.get_opts(
location,
GetOptions {
range: None,
head: true,
..Default::default()
},
)
.await?;
let meta = result.meta.clone();
self.save_result(result).await.ok();
Ok(meta)
}
}
}
pub async fn cached_get_opts(
&self,
location: &Path,
opts: GetOptions,
) -> object_store::Result<GetResult> {
let get_range = opts.range.clone();
let (meta, attributes) = self.maybe_prefetch_range(location, opts).await?;
let range = self.canonicalize_range(get_range, meta.size)?;
let parts = self.split_range_into_parts(range.clone());
let futures = parts
.into_iter()
.map(|(part_id, range_in_part)| self.read_part(location, part_id, range_in_part))
.collect::<Vec<_>>();
let result_stream = stream::iter(futures).then(|fut| fut).boxed();
Ok(GetResult {
meta,
range,
attributes,
payload: GetResultPayload::Stream(result_stream),
})
}
#[allow(unused)]
async fn cached_put_opts(
&self,
location: &Path,
payload: object_store::PutPayload,
opts: object_store::PutOptions,
) -> object_store::Result<PutResult> {
self.object_store.put_opts(location, payload, opts).await
}
async fn maybe_prefetch_range(
&self,
location: &Path,
mut opts: GetOptions,
) -> object_store::Result<(ObjectMeta, Attributes)> {
let entry = self.cache_storage.entry(location, self.part_size_bytes);
match entry.read_head().await {
Ok(Some((meta, attrs))) => return Ok((meta, attrs)),
Ok(None) => {}
Err(_) => {
}
};
if let Some(range) = &opts.range {
opts.range = Some(self.align_get_range(range));
}
let get_result = self.object_store.get_opts(location, opts).await?;
let result_meta = get_result.meta.clone();
let result_attrs = get_result.attributes.clone();
self.save_result(get_result).await.ok();
Ok((result_meta, result_attrs))
}
async fn save_result(&self, result: GetResult) -> object_store::Result<u64> {
let part_size_bytes_u64 = self.part_size_bytes as u64;
assert!(result.range.start % part_size_bytes_u64 == 0);
assert!(
result.range.end % part_size_bytes_u64 == 0 || result.range.end == result.meta.size
);
let entry = self
.cache_storage
.entry(&result.meta.location, self.part_size_bytes);
entry.save_head((&result.meta, &result.attributes)).await?;
let mut buffer = BytesMut::new();
let mut part_number = usize::try_from(result.range.start / part_size_bytes_u64)
.expect("Part number exceeds u32 on a 32-bit system. Try increasing part size.");
let object_size = result.meta.size;
let mut stream = result.into_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
buffer.extend_from_slice(&chunk);
while buffer.len() >= self.part_size_bytes {
let to_write = buffer.split_to(self.part_size_bytes);
entry.save_part(part_number, to_write.into()).await?;
part_number += 1;
}
}
if !buffer.is_empty() {
entry.save_part(part_number, buffer.into()).await?;
return Ok(object_size);
}
Ok(object_size)
}
fn split_range_into_parts(&self, range: Range<u64>) -> Vec<(PartID, Range<usize>)> {
let part_size_bytes_u64 = self.part_size_bytes as u64;
let range_aligned = self.align_range(&range, self.part_size_bytes);
let start_part = range_aligned.start / part_size_bytes_u64;
let end_part = range_aligned.end / part_size_bytes_u64;
let mut parts: Vec<_> = (start_part..end_part)
.map(|part_id| {
(
usize::try_from(part_id).expect("Number of parts exceeds usize"),
Range {
start: 0,
end: self.part_size_bytes,
},
)
})
.collect();
if parts.is_empty() {
return vec![];
}
if let Some(first_part) = parts.first_mut() {
first_part.1.start = usize::try_from(range.start % part_size_bytes_u64)
.expect("Part size is too large to fit in a usize");
}
if let Some(last_part) = parts.last_mut() {
if range.end % part_size_bytes_u64 != 0 {
last_part.1.end = usize::try_from(range.end % part_size_bytes_u64)
.expect("Part size is too large to fit in a usize");
}
}
parts
}
fn read_part(
&self,
location: &Path,
part_id: PartID,
range_in_part: Range<usize>,
) -> BoxFuture<'static, object_store::Result<Bytes>> {
let part_size = self.part_size_bytes;
let object_store = self.object_store.clone();
let location = location.clone();
let entry = self.cache_storage.entry(&location, self.part_size_bytes);
let db_stats = self.stats.clone();
Box::pin(async move {
db_stats.object_store_cache_part_access.inc();
if let Ok(Some(bytes)) = entry.read_part(part_id, range_in_part.clone()).await {
db_stats.object_store_cache_part_hits.inc();
return Ok(bytes);
}
let range = Range {
start: (part_id * part_size) as u64,
end: ((part_id + 1) * part_size) as u64,
};
let get_result = object_store
.get_opts(
&location,
GetOptions {
range: Some(GetRange::Bounded(range)),
..Default::default()
},
)
.await?;
let meta = get_result.meta.clone();
let attrs = get_result.attributes.clone();
let bytes = get_result.bytes().await?;
entry.save_head((&meta, &attrs)).await.ok();
entry.save_part(part_id, bytes.clone()).await.ok();
Ok(Bytes::copy_from_slice(&bytes.slice(range_in_part)))
})
}
fn canonicalize_range(
&self,
range: Option<GetRange>,
object_size: u64,
) -> object_store::Result<Range<u64>> {
let (start_offset, end_offset) = match range {
None => (0, object_size),
Some(range) => match range {
GetRange::Bounded(range) => {
if range.start >= object_size {
return Err(object_store::Error::Generic {
store: "cached_object_store",
source: Box::new(InvalidGetRange::StartTooLarge {
requested: range.start,
length: object_size,
}),
});
}
if range.start >= range.end {
return Err(object_store::Error::Generic {
store: "cached_object_store",
source: Box::new(InvalidGetRange::Inconsistent {
start: range.start,
end: range.end,
}),
});
}
(range.start, range.end.min(object_size))
}
GetRange::Offset(offset) => {
if offset >= object_size {
return Err(object_store::Error::Generic {
store: "cached_object_store",
source: Box::new(InvalidGetRange::StartTooLarge {
requested: offset,
length: object_size,
}),
});
}
(offset, object_size)
}
GetRange::Suffix(suffix) => (object_size.saturating_sub(suffix), object_size),
},
};
Ok(Range {
start: start_offset,
end: end_offset,
})
}
fn align_get_range(&self, range: &GetRange) -> GetRange {
match range {
GetRange::Bounded(bounded) => {
let aligned = self.align_range(bounded, self.part_size_bytes);
GetRange::Bounded(aligned)
}
GetRange::Suffix(suffix) => {
let suffix_aligned = self.align_range(&(0..*suffix), self.part_size_bytes).end;
GetRange::Suffix(suffix_aligned)
}
GetRange::Offset(offset) => {
let offset_aligned = *offset - *offset % self.part_size_bytes as u64;
GetRange::Offset(offset_aligned)
}
}
}
fn align_range(&self, range: &Range<u64>, alignment: usize) -> Range<u64> {
let alignment = alignment as u64;
let start_aligned = range.start - range.start % alignment;
let end_aligned = range.end.div_ceil(alignment) * alignment;
Range {
start: start_aligned,
end: end_aligned,
}
}
}
impl std::fmt::Display for CachedObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"CachedObjectStore({}, {})",
self.object_store, self.cache_storage
)
}
}
#[async_trait::async_trait]
impl ObjectStore for CachedObjectStore {
async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
self.cached_get_opts(location, options).await
}
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
self.cached_head(location).await
}
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> object_store::Result<PutResult> {
self.object_store.put_opts(location, payload, opts).await
}
async fn put_multipart(
&self,
location: &Path,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.object_store.put_multipart(location).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOpts,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.object_store.put_multipart_opts(location, opts).await
}
async fn delete(&self, location: &Path) -> object_store::Result<()> {
self.object_store.delete(location).await
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
self.object_store.list(prefix)
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
self.object_store.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
self.object_store.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.object_store.copy(from, to).await
}
async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.object_store.rename(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.object_store.copy_if_not_exists(from, to).await
}
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.object_store.rename_if_not_exists(from, to).await
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum InvalidGetRange {
#[error("Range start too large, requested: {requested}, length: {length}")]
StartTooLarge { requested: u64, length: u64 },
#[error("Range started at {start} and ended at {end}")]
Inconsistent { start: u64, end: u64 },
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use object_store::{path::Path, GetOptions, GetRange, ObjectStore, PutPayload};
use rand::Rng;
use super::CachedObjectStore;
use crate::cached_object_store::stats::CachedObjectStoreStats;
use crate::cached_object_store::storage_fs::FsCacheStorage;
use crate::cached_object_store::{storage::PartID, storage_fs::FsCacheEntry};
use crate::stats::StatRegistry;
use crate::test_utils::gen_rand_bytes;
fn new_test_cache_folder() -> std::path::PathBuf {
let mut rng = rand::thread_rng();
let dir_name: String = (0..10)
.map(|_| rng.sample(rand::distributions::Alphanumeric) as char)
.collect();
let path = format!("/tmp/testcache-{}", dir_name);
let _ = std::fs::remove_dir_all(&path);
std::path::PathBuf::from(path)
}
#[tokio::test]
async fn test_save_result_not_aligned() -> object_store::Result<()> {
let payload = gen_rand_bytes(1024 * 3 + 32);
let object_store = Arc::new(object_store::memory::InMemory::new());
let test_cache_folder = new_test_cache_folder();
let stats_registry = StatRegistry::new();
let stats = Arc::new(CachedObjectStoreStats::new(&stats_registry));
object_store
.put(
&Path::from("/data/testfile1"),
PutPayload::from_bytes(payload.clone()),
)
.await?;
let location = Path::from("/data/testfile1");
let get_result = object_store.get(&location).await?;
let cache_storage = Arc::new(FsCacheStorage::new(
test_cache_folder.clone(),
None,
None,
stats.clone(),
));
let part_size = 1024;
let cached_store =
CachedObjectStore::new(object_store.clone(), cache_storage, part_size, stats).unwrap();
let entry = cached_store.cache_storage.entry(&location, 1024);
let object_size_hint = cached_store.save_result(get_result).await?;
assert_eq!(object_size_hint, 1024 * 3 + 32);
let head = entry.read_head().await?;
assert_eq!(head.unwrap().0.size, 1024 * 3 + 32);
let cached_parts = entry.cached_parts().await?;
assert_eq!(cached_parts.len(), 4);
assert_eq!(
entry.read_part(0, 0..part_size).await?,
Some(payload.slice(0..1024))
);
assert_eq!(
entry.read_part(1, 0..part_size).await?,
Some(payload.slice(1024..2048))
);
assert_eq!(
entry.read_part(2, 0..part_size).await?,
Some(payload.slice(2048..3072))
);
let evict_part_path =
FsCacheEntry::make_part_path(test_cache_folder.clone(), &location, 2, 1024);
std::fs::remove_file(evict_part_path).unwrap();
assert_eq!(entry.read_part(2, 0..part_size).await?, None);
let cached_parts = entry.cached_parts().await?;
assert_eq!(cached_parts, vec![0, 1, 3]);
let evict_part_path =
FsCacheEntry::make_part_path(test_cache_folder.clone(), &location, 3, 1024);
std::fs::remove_file(evict_part_path).unwrap();
assert_eq!(entry.read_part(3, 0..part_size).await?, None);
let cached_parts = entry.cached_parts().await?;
assert_eq!(cached_parts, vec![0, 1]);
Ok(())
}
#[tokio::test]
async fn test_save_result_aligned() -> object_store::Result<()> {
let payload = gen_rand_bytes(1024 * 3);
let object_store = Arc::new(object_store::memory::InMemory::new());
let test_cache_folder = new_test_cache_folder();
let stats_registry = StatRegistry::new();
let stats = Arc::new(CachedObjectStoreStats::new(&stats_registry));
object_store
.put(
&Path::from("/data/testfile1"),
PutPayload::from_bytes(payload.clone()),
)
.await?;
let location = Path::from("/data/testfile1");
let get_result = object_store.get(&location).await?;
let part_size = 1024;
let cache_storage = Arc::new(FsCacheStorage::new(
test_cache_folder.clone(),
None,
None,
stats.clone(),
));
let cached_store =
CachedObjectStore::new(object_store, cache_storage, part_size, stats).unwrap();
let entry = cached_store.cache_storage.entry(&location, part_size);
let object_size_hint = cached_store.save_result(get_result).await?;
assert_eq!(object_size_hint, 1024 * 3);
let cached_parts = entry.cached_parts().await?;
assert_eq!(cached_parts.len(), 3);
assert_eq!(
entry.read_part(0, 0..part_size).await?,
Some(payload.slice(0..1024))
);
assert_eq!(
entry.read_part(1, 0..part_size).await?,
Some(payload.slice(1024..2048))
);
assert_eq!(
entry.read_part(2, 0..part_size).await?,
Some(payload.slice(2048..3072))
);
let evict_part_path =
FsCacheEntry::make_part_path(test_cache_folder.clone(), &location, 2, part_size);
std::fs::remove_file(evict_part_path).unwrap();
assert_eq!(entry.read_part(2, 0..part_size).await?, None);
let cached_parts = entry.cached_parts().await?;
assert_eq!(cached_parts.len(), 2);
Ok(())
}
#[test]
fn test_split_range_into_parts() {
let object_store = Arc::new(object_store::memory::InMemory::new());
let test_cache_folder = new_test_cache_folder();
let stats_registry = StatRegistry::new();
let stats = Arc::new(CachedObjectStoreStats::new(&stats_registry));
let cache_storage = Arc::new(FsCacheStorage::new(
test_cache_folder.clone(),
None,
None,
stats.clone(),
));
let cached_store =
CachedObjectStore::new(object_store, cache_storage, 1024, stats).unwrap();
struct Test {
input: (Option<GetRange>, usize),
expect: Vec<(PartID, std::ops::Range<usize>)>,
}
let tests = [
Test {
input: (None, 1024 * 3),
expect: vec![(0, 0..1024), (1, 0..1024), (2, 0..1024)],
},
Test {
input: (None, 1024 * 3 + 12),
expect: vec![(0, 0..1024), (1, 0..1024), (2, 0..1024), (3, 0..12)],
},
Test {
input: (None, 12),
expect: vec![(0, 0..12)],
},
Test {
input: (Some(GetRange::Bounded(0..1024)), 1024),
expect: vec![(0, 0..1024)],
},
Test {
input: (Some(GetRange::Bounded(128..1024)), 20000),
expect: vec![(0, 128..1024)],
},
Test {
input: (Some(GetRange::Bounded(128..1024 + 12)), 20000),
expect: vec![(0, 128..1024), (1, 0..12)],
},
Test {
input: (Some(GetRange::Bounded(128..1024 * 2 + 12)), 20000),
expect: vec![(0, 128..1024), (1, 0..1024), (2, 0..12)],
},
Test {
input: (Some(GetRange::Bounded(1024 * 2..1024 * 3 + 12)), 200000),
expect: vec![(2, 0..1024), (3, 0..12)],
},
Test {
input: (Some(GetRange::Bounded(1024 * 2 - 2..1024 * 3 + 12)), 20000),
expect: vec![(1, 1022..1024), (2, 0..1024), (3, 0..12)],
},
Test {
input: (Some(GetRange::Suffix(128)), 1024),
expect: vec![(0, 896..1024)],
},
Test {
input: (Some(GetRange::Suffix(1024 * 2 + 8)), 1024 * 4),
expect: vec![(1, 1016..1024), (2, 0..1024), (3, 0..1024)],
},
Test {
input: (Some(GetRange::Offset(8)), 1024 * 4),
expect: vec![(0, 8..1024), (1, 0..1024), (2, 0..1024), (3, 0..1024)],
},
Test {
input: (Some(GetRange::Offset(1024 * 2 + 8)), 1024 * 4),
expect: vec![(2, 8..1024), (3, 0..1024)],
},
Test {
input: (Some(GetRange::Offset(1024 * 2 + 8)), 1024 * 4 + 2),
expect: vec![(2, 8..1024), (3, 0..1024), (4, 0..2)],
},
];
for t in tests.iter() {
let range = cached_store
.canonicalize_range(t.input.0.clone(), t.input.1 as u64)
.unwrap();
let parts = cached_store.split_range_into_parts(range);
assert_eq!(parts, t.expect, "input: {:?}", t.input);
}
}
#[test]
fn test_align_range() {
let object_store = Arc::new(object_store::memory::InMemory::new());
let test_cache_folder = new_test_cache_folder();
let stats_registry = StatRegistry::new();
let stats = Arc::new(CachedObjectStoreStats::new(&stats_registry));
let cache_storage = Arc::new(FsCacheStorage::new(
test_cache_folder.clone(),
None,
None,
stats.clone(),
));
let cached_store =
CachedObjectStore::new(object_store, cache_storage, 1024, stats).unwrap();
let aligned = cached_store.align_range(&(9..1025), 1024);
assert_eq!(aligned, 0..2048);
let aligned = cached_store.align_range(&(1024 + 1..2048 + 4), 1024);
assert_eq!(aligned, 1024..3072);
}
#[test]
fn test_align_get_range() {
let object_store = Arc::new(object_store::memory::InMemory::new());
let test_cache_folder = new_test_cache_folder();
let stats_registry = StatRegistry::new();
let stats = Arc::new(CachedObjectStoreStats::new(&stats_registry));
let cache_storage = Arc::new(FsCacheStorage::new(
test_cache_folder.clone(),
None,
None,
stats.clone(),
));
let cached_store =
CachedObjectStore::new(object_store, cache_storage, 1024, stats).unwrap();
let aligned = cached_store.align_get_range(&GetRange::Bounded(9..1025));
assert_eq!(aligned, GetRange::Bounded(0..2048));
let aligned = cached_store.align_get_range(&GetRange::Bounded(9..2048));
assert_eq!(aligned, GetRange::Bounded(0..2048));
let aligned = cached_store.align_get_range(&GetRange::Suffix(12));
assert_eq!(aligned, GetRange::Suffix(1024));
let aligned = cached_store.align_get_range(&GetRange::Suffix(1024));
assert_eq!(aligned, GetRange::Suffix(1024));
let aligned = cached_store.align_get_range(&GetRange::Offset(1024));
assert_eq!(aligned, GetRange::Offset(1024));
let aligned = cached_store.align_get_range(&GetRange::Offset(12));
assert_eq!(aligned, GetRange::Offset(0));
}
#[tokio::test]
async fn test_cached_object_store_impl_object_store() -> object_store::Result<()> {
let object_store = Arc::new(object_store::memory::InMemory::new());
let test_cache_folder = new_test_cache_folder();
let stats_registry = StatRegistry::new();
let stats = Arc::new(CachedObjectStoreStats::new(&stats_registry));
let cache_storage = Arc::new(FsCacheStorage::new(
test_cache_folder.clone(),
None,
None,
stats.clone(),
));
let cached_store =
CachedObjectStore::new(object_store.clone(), cache_storage, 1024, stats).unwrap();
let test_path = Path::from("/data/testdata1");
let test_payload = gen_rand_bytes(1024 * 3 + 2);
object_store
.put(&test_path, PutPayload::from_bytes(test_payload.clone()))
.await?;
let test_ranges = vec![
Some(GetRange::Offset(260817)),
None,
Some(GetRange::Bounded(1000..2048)),
Some(GetRange::Bounded(1000..260817)),
Some(GetRange::Suffix(10)),
Some(GetRange::Suffix(260817)),
Some(GetRange::Offset(1000)),
Some(GetRange::Offset(0)),
Some(GetRange::Offset(1028)),
Some(GetRange::Offset(260817)),
Some(GetRange::Offset(1024 * 3 + 2)),
Some(GetRange::Offset(1024 * 3 + 1)),
#[allow(clippy::reversed_empty_ranges)]
Some(GetRange::Bounded(2900..2048)),
Some(GetRange::Bounded(10..10)),
];
for range in test_ranges.iter() {
let want = object_store
.get_opts(
&test_path,
GetOptions {
range: range.clone(),
..Default::default()
},
)
.await;
let got = cached_store
.cached_get_opts(
&test_path,
GetOptions {
range: range.clone(),
..Default::default()
},
)
.await;
match (want, got) {
(Ok(want), Ok(got)) => {
assert_eq!(want.range, got.range);
assert_eq!(want.meta, got.meta);
assert_eq!(want.bytes().await?, got.bytes().await?);
}
(Err(want), Err(got)) => {
if want.to_string().to_lowercase().contains("range") {
assert!(got.to_string().to_lowercase().contains("range"));
}
}
(origin_result, cached_result) => {
panic!("expect: {:?}, got: {:?}", origin_result, cached_result);
}
}
}
Ok(())
}
}