use anyhow::Result;
use async_trait::async_trait;
use futures_util::{stream, StreamExt};
use crate::{
ingestion::{IngestionNode, IngestionStream},
traits::Persist,
};
use super::Qdrant;
#[async_trait]
impl Persist for Qdrant {
fn batch_size(&self) -> Option<usize> {
self.batch_size
}
#[tracing::instrument(skip_all, err)]
async fn setup(&self) -> Result<()> {
tracing::debug!("Setting up Qdrant storage");
self.create_index_if_not_exists().await
}
#[tracing::instrument(skip_all, err, name = "storage.qdrant.store")]
async fn store(&self, node: crate::ingestion::IngestionNode) -> Result<IngestionNode> {
let point = node.clone().try_into()?;
self.client
.upsert_points_blocking(self.collection_name.to_string(), None, vec![point], None)
.await?;
Ok(node)
}
#[tracing::instrument(skip_all, name = "storage.qdrant.batch_store")]
async fn batch_store(&self, nodes: Vec<crate::ingestion::IngestionNode>) -> IngestionStream {
let points = nodes
.iter()
.map(|node| node.clone().try_into())
.collect::<Result<Vec<_>>>();
if points.is_err() {
return stream::iter(vec![Err(points.unwrap_err())]).boxed();
}
let points = points.unwrap();
let result = self
.client
.upsert_points_blocking(self.collection_name.to_string(), None, points, None)
.await;
if result.is_ok() {
stream::iter(nodes.into_iter().map(Ok)).boxed()
} else {
stream::iter(vec![Err(result.unwrap_err())]).boxed()
}
}
}