use cbor2::{from_reader, to_writer};
use futures::{StreamExt, TryStreamExt, stream::BoxStream};
use moka::{future::Cache, ops::compute::Op};
use object_store::{path::Path, *};
use serde::{Serialize, de::DeserializeOwned};
use std::sync::Arc;
use crate::map_arc_error;
pub(crate) trait SidecarMeta: Serialize + DeserializeOwned + Send + Sync + 'static {
const STORE_NAME: &'static str;
fn e_tag(&self) -> Option<&str>;
fn set_original(&mut self, e_tag: Option<String>, version: Option<String>);
}
pub(crate) struct SidecarStore<T: ObjectStore, M: SidecarMeta> {
pub(crate) store: T,
data_prefix: Path,
meta_prefix: Path,
meta_cache: Cache<Path, Arc<M>>,
}
impl<T: ObjectStore, M: SidecarMeta> SidecarStore<T, M> {
pub(crate) fn new(store: T, meta_cache: Cache<Path, Arc<M>>) -> Self {
SidecarStore {
store,
data_prefix: Path::from("data"),
meta_prefix: Path::from("meta"),
meta_cache,
}
}
pub(crate) fn meta_path(&self, location: &Path) -> Path {
self.meta_prefix.parts().chain(location.parts()).collect()
}
pub(crate) fn full_path(&self, location: &Path) -> Path {
self.data_prefix.parts().chain(location.parts()).collect()
}
pub(crate) fn strip_prefix(&self, path: Path) -> Path {
if let Some(suffix) = path.prefix_match(&self.data_prefix) {
return suffix.collect();
}
path
}
async fn load_meta(&self, location: &Path) -> Result<M> {
let meta_path = self.meta_path(location);
let data = self.store.get(&meta_path).await?;
let data = data.bytes().await?;
let meta: M = from_reader(&data[..]).map_err(|err| Error::Generic {
store: M::STORE_NAME,
source: format!("Failed to deserialize Metadata for path {location}: {err:?}").into(),
})?;
Ok(meta)
}
pub(crate) async fn get_meta(&self, location: &Path) -> Result<Arc<M>> {
let meta = self
.meta_cache
.try_get_with(location.clone(), async {
let meta = self.load_meta(location).await?;
Ok(Arc::new(meta))
})
.await
.map_err(|err| map_arc_error(M::STORE_NAME, err))?;
Ok(meta)
}
pub(crate) async fn put_meta(&self, location: &Path, meta: M) -> Result<PutResult> {
let meta_path = self.meta_path(location);
let mut data = Vec::new();
to_writer(&meta, &mut data).map_err(|err| Error::Generic {
store: M::STORE_NAME,
source: format!("Failed to serialize Metadata for path {location}: {err:?}").into(),
})?;
let rt = self
.store
.put_opts(&meta_path, data.into(), PutOptions::default())
.await?;
self.meta_cache
.insert(location.clone(), Arc::new(meta))
.await;
Ok(rt)
}
pub(crate) async fn update_meta_with<F>(&self, location: &Path, f: F) -> Result<Arc<M>>
where
F: AsyncFnOnce(Option<&M>) -> Result<M>,
{
let rt = self
.meta_cache
.entry(location.clone())
.and_try_compute_with(|entry| async {
let val = match entry {
Some(meta) => f(Some(meta.value())).await?,
None => match self.load_meta(location).await {
Ok(meta) => f(Some(&meta)).await?,
Err(Error::NotFound { .. }) => f(None).await?,
Err(err) => return Err(err),
},
};
let meta_path = self.meta_path(location);
let mut data = Vec::new();
to_writer(&val, &mut data).map_err(|err| Error::Generic {
store: M::STORE_NAME,
source: format!("Failed to serialize Metadata for path {location}: {err:?}")
.into(),
})?;
self.store
.put_opts(&meta_path, data.into(), PutOptions::default())
.await?;
Ok::<_, Error>(Op::Put(Arc::new(val)))
})
.await?;
Ok(rt.unwrap().value().clone())
}
async fn remove_meta_cache(&self, location: &Path) {
self.meta_cache.remove(location).await;
}
async fn refresh_meta_original_tag(&self, location: &Path) -> Result<()> {
let mut meta = self.load_meta(location).await?;
let obj = self.store.head(&self.full_path(location)).await?;
meta.set_original(obj.e_tag, obj.version);
self.put_meta(location, meta).await?;
Ok(())
}
pub(crate) fn delete_stream(
self: Arc<Self>,
locations: BoxStream<'static, Result<Path>>,
) -> BoxStream<'static, Result<Path>> {
let inner = self;
locations
.map(move |location| {
let inner = inner.clone();
async move {
let location = location?;
let data_res = inner.store.delete(&inner.full_path(&location)).await;
let meta_res = inner.store.delete(&inner.meta_path(&location)).await;
inner.remove_meta_cache(&location).await;
match (data_res, meta_res) {
(Ok(()), Ok(()) | Err(Error::NotFound { .. })) => Ok(location),
(Ok(()), Err(err)) => Err(err),
(Err(Error::NotFound { source, .. }), _) => Err(Error::NotFound {
path: location.to_string(),
source,
}),
(Err(err), _) => Err(err),
}
}
})
.buffered(10)
.boxed()
}
pub(crate) fn list(
self: Arc<Self>,
prefix: Option<&Path>,
with_logical_e_tag: bool,
) -> BoxStream<'static, Result<ObjectMeta>> {
let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
let stream = self.store.list(Some(&prefix));
self.decorate_listing(stream, with_logical_e_tag)
}
pub(crate) fn list_with_offset(
self: Arc<Self>,
prefix: Option<&Path>,
offset: &Path,
with_logical_e_tag: bool,
) -> BoxStream<'static, Result<ObjectMeta>> {
let offset = self.full_path(offset);
let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
let stream = self.store.list_with_offset(Some(&prefix), &offset);
self.decorate_listing(stream, with_logical_e_tag)
}
fn decorate_listing(
self: Arc<Self>,
stream: BoxStream<'static, Result<ObjectMeta>>,
with_logical_e_tag: bool,
) -> BoxStream<'static, Result<ObjectMeta>> {
let inner = self;
if !with_logical_e_tag {
return stream
.map_ok(move |mut obj| {
obj.location = inner.strip_prefix(obj.location);
obj
})
.boxed();
}
stream
.map_ok(move |mut obj| {
let store = inner.clone();
async move {
let location = store.strip_prefix(obj.location);
let meta = store.get_meta(&location).await?;
obj.location = location;
obj.e_tag = meta.e_tag().map(String::from);
Ok::<ObjectMeta, Error>(obj)
}
})
.try_buffered(8) .boxed()
}
pub(crate) async fn list_with_delimiter(
&self,
prefix: Option<&Path>,
with_logical_e_tag: bool,
) -> Result<ListResult> {
let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
let rt = self.store.list_with_delimiter(Some(&prefix)).await?;
let common_prefixes = rt
.common_prefixes
.into_iter()
.map(|p| self.strip_prefix(p))
.collect::<Vec<_>>();
let objects = rt
.objects
.into_iter()
.map(|mut meta| {
meta.location = self.strip_prefix(meta.location);
meta
})
.collect::<Vec<_>>();
if !with_logical_e_tag {
return Ok(ListResult {
common_prefixes,
objects,
});
}
let mut indexed = futures::stream::iter(objects.into_iter().enumerate().map(
move |(idx, mut obj)| async move {
let meta = self.get_meta(&obj.location).await?;
obj.e_tag = meta.e_tag().map(String::from);
Ok::<(usize, ObjectMeta), Error>((idx, obj))
},
))
.buffer_unordered(8)
.try_collect::<Vec<_>>()
.await?;
indexed.sort_by_key(|(idx, _)| *idx);
let objects = indexed.into_iter().map(|(_, obj)| obj).collect();
Ok(ListResult {
common_prefixes,
objects,
})
}
pub(crate) async fn copy_opts(
&self,
from: &Path,
to: &Path,
options: CopyOptions,
) -> Result<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
self.store
.copy_opts(&full_from, &full_to, options.clone())
.await?;
let meta_from = self.meta_path(from);
let meta_to = self.meta_path(to);
let meta_options = CopyOptions {
mode: CopyMode::Overwrite,
extensions: options.extensions,
};
self.store
.copy_opts(&meta_from, &meta_to, meta_options)
.await?;
self.remove_meta_cache(to).await;
self.refresh_meta_original_tag(to).await?;
Ok(())
}
pub(crate) async fn rename_opts(
&self,
from: &Path,
to: &Path,
options: RenameOptions,
) -> Result<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
self.store
.rename_opts(&full_from, &full_to, options.clone())
.await?;
self.remove_meta_cache(from).await;
let meta_from = self.meta_path(from);
let meta_to = self.meta_path(to);
let meta_options = RenameOptions {
target_mode: RenameTargetMode::Overwrite,
extensions: options.extensions,
};
self.store
.rename_opts(&meta_from, &meta_to, meta_options)
.await?;
self.remove_meta_cache(to).await;
self.refresh_meta_original_tag(to).await?;
Ok(())
}
}