use anda_core::{BoxError, BoxPinFut, ObjectMeta, Path, PutMode, PutResult, path_join};
use futures::TryStreamExt;
use object_store::PutOptions;
use std::sync::Arc;
pub use object_store::{ObjectStore, ObjectStoreExt, local::LocalFileSystem, memory::InMemory};
pub const MAX_STORE_OBJECT_SIZE: usize = 1024 * 1024 * 2;
pub trait VectorSearchFeaturesDyn: Send + Sync + 'static {
fn top_n(
&self,
namespace: Path,
query: String,
n: usize,
) -> BoxPinFut<Result<Vec<String>, BoxError>>;
fn top_n_ids(
&self,
namespace: Path,
query: String,
n: usize,
) -> BoxPinFut<Result<Vec<String>, BoxError>>;
}
#[derive(Clone)]
pub struct VectorStore {
inner: Arc<dyn VectorSearchFeaturesDyn>,
}
impl VectorStore {
pub fn new(inner: Arc<dyn VectorSearchFeaturesDyn>) -> Self {
Self { inner }
}
pub fn not_implemented() -> Self {
Self {
inner: Arc::new(NotImplemented),
}
}
}
impl VectorSearchFeaturesDyn for VectorStore {
fn top_n(
&self,
namespace: Path,
query: String,
n: usize,
) -> BoxPinFut<Result<Vec<String>, BoxError>> {
self.inner.top_n(namespace, query, n)
}
fn top_n_ids(
&self,
namespace: Path,
query: String,
n: usize,
) -> BoxPinFut<Result<Vec<String>, BoxError>> {
self.inner.top_n_ids(namespace, query, n)
}
}
#[derive(Clone, Debug)]
pub struct NotImplemented;
impl VectorSearchFeaturesDyn for NotImplemented {
fn top_n(
&self,
_namespace: Path,
_query: String,
_n: usize,
) -> BoxPinFut<Result<Vec<String>, BoxError>> {
Box::pin(futures::future::ready(Err("not implemented".into())))
}
fn top_n_ids(
&self,
_namespace: Path,
_query: String,
_n: usize,
) -> BoxPinFut<Result<Vec<String>, BoxError>> {
Box::pin(futures::future::ready(Err("not implemented".into())))
}
}
#[derive(Clone, Debug)]
pub struct MockImplemented;
impl VectorSearchFeaturesDyn for MockImplemented {
fn top_n(
&self,
_namespace: Path,
_query: String,
_n: usize,
) -> BoxPinFut<Result<Vec<String>, BoxError>> {
Box::pin(futures::future::ready(Ok(vec![])))
}
fn top_n_ids(
&self,
_namespace: Path,
_query: String,
_n: usize,
) -> BoxPinFut<Result<Vec<String>, BoxError>> {
Box::pin(futures::future::ready(Ok(vec![])))
}
}
#[derive(Clone)]
pub struct Store {
store: Arc<dyn ObjectStore>,
}
impl Store {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self { store }
}
pub async fn store_get(
&self,
namespace: &Path,
path: &Path,
) -> Result<(bytes::Bytes, ObjectMeta), BoxError> {
let path = path_join(namespace, path);
let res = self.store.get_opts(&path, Default::default()).await?;
let data = match res.payload {
object_store::GetResultPayload::Stream(mut stream) => {
let mut buf = bytes::BytesMut::new();
while let Some(data) = stream.try_next().await? {
buf.extend_from_slice(&data);
}
buf.freeze() }
_ => return Err("StoreFeatures: unexpected payload from get_opts".into()),
};
Ok((data, res.meta))
}
pub async fn store_list(
&self,
namespace: &Path,
prefix: Option<&Path>,
offset: &Path,
) -> Result<Vec<ObjectMeta>, BoxError> {
let prefix = prefix.map(|p| path_join(namespace, p));
let offset = path_join(namespace, offset);
let mut res = if offset.is_root() {
self.store.list(prefix.as_ref())
} else {
self.store.list_with_offset(prefix.as_ref(), &offset)
};
let mut metas = Vec::new();
while let Some(meta) = res.try_next().await? {
metas.push(meta)
}
Ok(metas)
}
pub async fn store_put(
&self,
namespace: &Path,
path: &Path,
mode: PutMode,
val: bytes::Bytes,
) -> Result<PutResult, BoxError> {
let full_path = path_join(namespace, path);
let res = self
.store
.put_opts(
&full_path,
val.into(),
PutOptions {
mode,
..Default::default()
},
)
.await?;
Ok(res)
}
pub async fn store_rename_if_not_exists(
&self,
namespace: &Path,
from: &Path,
to: &Path,
) -> Result<(), BoxError> {
let from = path_join(namespace, from);
let to = path_join(namespace, to);
self.store.rename_if_not_exists(&from, &to).await?;
Ok(())
}
pub async fn store_delete(&self, namespace: &Path, path: &Path) -> Result<(), BoxError> {
let path = path_join(namespace, path);
self.store.delete(&path).await?;
Ok(())
}
}