use std::sync::Arc;
use tracing::{error, info};
use crate::chunking::Chunker;
use crate::config::RagConfig;
use crate::document::{Chunk, Document, SearchResult};
use crate::embedding::EmbeddingProvider;
use crate::error::{RagError, Result};
use crate::reranker::Reranker;
use crate::vectorstore::VectorStore;
pub struct RagPipeline {
config: RagConfig,
embedding_provider: Arc<dyn EmbeddingProvider>,
vector_store: Arc<dyn VectorStore>,
chunker: Arc<dyn Chunker>,
reranker: Option<Arc<dyn Reranker>>,
}
impl RagPipeline {
pub fn builder() -> RagPipelineBuilder {
RagPipelineBuilder::default()
}
pub fn config(&self) -> &RagConfig {
&self.config
}
pub fn embedding_provider(&self) -> &Arc<dyn EmbeddingProvider> {
&self.embedding_provider
}
pub fn vector_store(&self) -> &Arc<dyn VectorStore> {
&self.vector_store
}
pub async fn create_collection(&self, name: &str) -> Result<()> {
let dimensions = self.embedding_provider.dimensions();
self.vector_store.create_collection(name, dimensions).await.map_err(|e| {
error!(collection = name, error = %e, "failed to create collection");
RagError::PipelineError(format!("failed to create collection '{name}': {e}"))
})
}
pub async fn delete_collection(&self, name: &str) -> Result<()> {
self.vector_store.delete_collection(name).await.map_err(|e| {
error!(collection = name, error = %e, "failed to delete collection");
RagError::PipelineError(format!("failed to delete collection '{name}': {e}"))
})
}
pub async fn ingest(&self, collection: &str, document: &Document) -> Result<Vec<Chunk>> {
let mut chunks = self.chunker.chunk(document);
if chunks.is_empty() {
info!(document.id = %document.id, chunk_count = 0, "ingested document (empty)");
return Ok(chunks);
}
let texts: Vec<&str> = chunks.iter().map(|c| c.text.as_str()).collect();
let embeddings = self.embedding_provider.embed_batch(&texts).await.map_err(|e| {
error!(document.id = %document.id, error = %e, "embedding failed during ingestion");
RagError::PipelineError(format!("embedding failed for document '{}': {e}", document.id))
})?;
for (chunk, embedding) in chunks.iter_mut().zip(embeddings) {
chunk.embedding = embedding;
}
self.vector_store.upsert(collection, &chunks).await.map_err(|e| {
error!(document.id = %document.id, error = %e, "upsert failed during ingestion");
RagError::PipelineError(format!("upsert failed for document '{}': {e}", document.id))
})?;
let chunk_count = chunks.len();
info!(document.id = %document.id, chunk_count, "ingested document");
Ok(chunks)
}
pub async fn ingest_batch(
&self,
collection: &str,
documents: &[Document],
) -> Result<Vec<Chunk>> {
let mut all_chunks = Vec::new();
for document in documents {
let chunks = self.ingest(collection, document).await?;
all_chunks.extend(chunks);
}
Ok(all_chunks)
}
pub async fn query(&self, collection: &str, query: &str) -> Result<Vec<SearchResult>> {
let query_embedding = self.embedding_provider.embed(query).await.map_err(|e| {
error!(error = %e, "embedding failed during query");
RagError::PipelineError(format!("query embedding failed: {e}"))
})?;
let results = self
.vector_store
.search(collection, &query_embedding, self.config.top_k)
.await
.map_err(|e| {
error!(collection, error = %e, "vector store search failed");
RagError::PipelineError(format!("search failed in collection '{collection}': {e}"))
})?;
let results = if let Some(reranker) = &self.reranker {
reranker.rerank(query, results).await.map_err(|e| {
error!(error = %e, "reranking failed");
RagError::PipelineError(format!("reranking failed: {e}"))
})?
} else {
results
};
let threshold = self.config.similarity_threshold;
let filtered: Vec<SearchResult> =
results.into_iter().filter(|r| r.score >= threshold).collect();
info!(result_count = filtered.len(), "query completed");
Ok(filtered)
}
}
#[derive(Default)]
pub struct RagPipelineBuilder {
config: Option<RagConfig>,
embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
vector_store: Option<Arc<dyn VectorStore>>,
chunker: Option<Arc<dyn Chunker>>,
reranker: Option<Arc<dyn Reranker>>,
}
impl RagPipelineBuilder {
pub fn config(mut self, config: RagConfig) -> Self {
self.config = Some(config);
self
}
pub fn embedding_provider(mut self, provider: Arc<dyn EmbeddingProvider>) -> Self {
self.embedding_provider = Some(provider);
self
}
pub fn vector_store(mut self, store: Arc<dyn VectorStore>) -> Self {
self.vector_store = Some(store);
self
}
pub fn chunker(mut self, chunker: Arc<dyn Chunker>) -> Self {
self.chunker = Some(chunker);
self
}
pub fn reranker(mut self, reranker: Arc<dyn Reranker>) -> Self {
self.reranker = Some(reranker);
self
}
pub fn build(self) -> Result<RagPipeline> {
let config =
self.config.ok_or_else(|| RagError::ConfigError("config is required".to_string()))?;
let embedding_provider = self
.embedding_provider
.ok_or_else(|| RagError::ConfigError("embedding_provider is required".to_string()))?;
let vector_store = self
.vector_store
.ok_or_else(|| RagError::ConfigError("vector_store is required".to_string()))?;
let chunker =
self.chunker.ok_or_else(|| RagError::ConfigError("chunker is required".to_string()))?;
Ok(RagPipeline {
config,
embedding_provider,
vector_store,
chunker,
reranker: self.reranker,
})
}
}