use std::sync::Arc;
use tokio::task::spawn_blocking;
use crate::collection::{Collection, DocSet, WriteResult, WriteSummary};
use crate::doc::Doc;
use crate::error::{ErrorCode, Result, ZvecError};
use crate::index_params::IndexParams;
use crate::options::CollectionOptions;
use crate::query::VectorQuery;
use crate::schema::{CollectionSchema, FieldSchema};
use crate::stats::CollectionStats;
#[derive(Clone)]
pub struct AsyncCollection {
inner: Arc<Collection>,
}
impl AsyncCollection {
pub async fn create_and_open(
path: impl Into<String>,
schema: CollectionSchema,
options: Option<CollectionOptions>,
) -> Result<Self> {
let path = path.into();
let inner =
run_blocking(move || Collection::create_and_open(&path, &schema, options.as_ref()))
.await?;
Ok(Self {
inner: Arc::new(inner),
})
}
pub async fn open(path: impl Into<String>, options: Option<CollectionOptions>) -> Result<Self> {
let path = path.into();
let inner = run_blocking(move || Collection::open(&path, options.as_ref())).await?;
Ok(Self {
inner: Arc::new(inner),
})
}
pub fn from_arc(inner: Arc<Collection>) -> Self {
Self { inner }
}
pub fn inner(&self) -> &Arc<Collection> {
&self.inner
}
pub async fn flush(&self) -> Result<()> {
let inner = Arc::clone(&self.inner);
run_blocking(move || inner.flush()).await
}
pub async fn optimize(&self) -> Result<()> {
let inner = Arc::clone(&self.inner);
run_blocking(move || inner.optimize()).await
}
pub async fn schema(&self) -> Result<CollectionSchema> {
let inner = Arc::clone(&self.inner);
run_blocking(move || inner.schema()).await
}
pub async fn options(&self) -> Result<CollectionOptions> {
let inner = Arc::clone(&self.inner);
run_blocking(move || inner.options()).await
}
pub async fn stats(&self) -> Result<CollectionStats> {
let inner = Arc::clone(&self.inner);
run_blocking(move || inner.stats()).await
}
pub async fn create_index(
&self,
field_name: impl Into<String>,
params: IndexParams,
) -> Result<()> {
let inner = Arc::clone(&self.inner);
let field_name = field_name.into();
run_blocking(move || inner.create_index(&field_name, ¶ms)).await
}
pub async fn drop_index(&self, field_name: impl Into<String>) -> Result<()> {
let inner = Arc::clone(&self.inner);
let field_name = field_name.into();
run_blocking(move || inner.drop_index(&field_name)).await
}
pub async fn add_column(&self, field: FieldSchema, expression: Option<String>) -> Result<()> {
let inner = Arc::clone(&self.inner);
run_blocking(move || inner.add_column(&field, expression.as_deref())).await
}
pub async fn drop_column(&self, column_name: impl Into<String>) -> Result<()> {
let inner = Arc::clone(&self.inner);
let name = column_name.into();
run_blocking(move || inner.drop_column(&name)).await
}
pub async fn alter_column(
&self,
column_name: impl Into<String>,
new_name: Option<String>,
new_schema: Option<FieldSchema>,
) -> Result<()> {
let inner = Arc::clone(&self.inner);
let name = column_name.into();
run_blocking(move || inner.alter_column(&name, new_name.as_deref(), new_schema.as_ref()))
.await
}
pub async fn insert(&self, docs: Vec<Doc>) -> Result<WriteSummary> {
let inner = Arc::clone(&self.inner);
run_blocking(move || {
let refs: Vec<&Doc> = docs.iter().collect();
inner.insert(&refs)
})
.await
}
pub async fn insert_with_results(&self, docs: Vec<Doc>) -> Result<Vec<WriteResult>> {
let inner = Arc::clone(&self.inner);
run_blocking(move || {
let refs: Vec<&Doc> = docs.iter().collect();
inner.insert_with_results(&refs)
})
.await
}
pub async fn update(&self, docs: Vec<Doc>) -> Result<WriteSummary> {
let inner = Arc::clone(&self.inner);
run_blocking(move || {
let refs: Vec<&Doc> = docs.iter().collect();
inner.update(&refs)
})
.await
}
pub async fn update_with_results(&self, docs: Vec<Doc>) -> Result<Vec<WriteResult>> {
let inner = Arc::clone(&self.inner);
run_blocking(move || {
let refs: Vec<&Doc> = docs.iter().collect();
inner.update_with_results(&refs)
})
.await
}
pub async fn upsert(&self, docs: Vec<Doc>) -> Result<WriteSummary> {
let inner = Arc::clone(&self.inner);
run_blocking(move || {
let refs: Vec<&Doc> = docs.iter().collect();
inner.upsert(&refs)
})
.await
}
pub async fn upsert_with_results(&self, docs: Vec<Doc>) -> Result<Vec<WriteResult>> {
let inner = Arc::clone(&self.inner);
run_blocking(move || {
let refs: Vec<&Doc> = docs.iter().collect();
inner.upsert_with_results(&refs)
})
.await
}
pub async fn delete(&self, pks: Vec<String>) -> Result<WriteSummary> {
let inner = Arc::clone(&self.inner);
run_blocking(move || {
let refs: Vec<&str> = pks.iter().map(String::as_str).collect();
inner.delete(&refs)
})
.await
}
pub async fn delete_by_filter(&self, filter: impl Into<String>) -> Result<()> {
let inner = Arc::clone(&self.inner);
let filter = filter.into();
run_blocking(move || inner.delete_by_filter(&filter)).await
}
pub async fn insert_iter<I>(&self, docs: I, batch_size: usize) -> Result<WriteSummary>
where
I: IntoIterator<Item = Doc> + Send + 'static,
{
let inner = Arc::clone(&self.inner);
run_blocking(move || inner.insert_iter(docs, batch_size)).await
}
pub async fn upsert_iter<I>(&self, docs: I, batch_size: usize) -> Result<WriteSummary>
where
I: IntoIterator<Item = Doc> + Send + 'static,
{
let inner = Arc::clone(&self.inner);
run_blocking(move || inner.upsert_iter(docs, batch_size)).await
}
pub async fn update_iter<I>(&self, docs: I, batch_size: usize) -> Result<WriteSummary>
where
I: IntoIterator<Item = Doc> + Send + 'static,
{
let inner = Arc::clone(&self.inner);
run_blocking(move || inner.update_iter(docs, batch_size)).await
}
pub async fn query(&self, query: VectorQuery) -> Result<DocSet> {
let inner = Arc::clone(&self.inner);
run_blocking(move || inner.query(&query)).await
}
pub async fn fetch(&self, pks: Vec<String>) -> Result<DocSet> {
let inner = Arc::clone(&self.inner);
run_blocking(move || {
let refs: Vec<&str> = pks.iter().map(String::as_str).collect();
inner.fetch(&refs)
})
.await
}
}
async fn run_blocking<F, T>(f: F) -> Result<T>
where
F: FnOnce() -> Result<T> + Send + 'static,
T: Send + 'static,
{
match spawn_blocking(f).await {
Ok(r) => r,
Err(join_err) => Err(ZvecError::with_message(
ErrorCode::Internal,
format!("tokio blocking task failed: {join_err}"),
)),
}
}