1use std::sync::Arc;
25
26use tracing::{error, info};
27
28use crate::chunking::Chunker;
29use crate::config::RagConfig;
30use crate::document::{Chunk, Document, SearchResult};
31use crate::embedding::EmbeddingProvider;
32use crate::error::{RagError, Result};
33use crate::reranker::Reranker;
34use crate::vectorstore::VectorStore;
35
36pub struct RagPipeline {
42 config: RagConfig,
43 embedding_provider: Arc<dyn EmbeddingProvider>,
44 vector_store: Arc<dyn VectorStore>,
45 chunker: Arc<dyn Chunker>,
46 reranker: Option<Arc<dyn Reranker>>,
47}
48
49impl RagPipeline {
50 pub fn builder() -> RagPipelineBuilder {
52 RagPipelineBuilder::default()
53 }
54
55 pub fn config(&self) -> &RagConfig {
57 &self.config
58 }
59
60 pub fn embedding_provider(&self) -> &Arc<dyn EmbeddingProvider> {
62 &self.embedding_provider
63 }
64
65 pub fn vector_store(&self) -> &Arc<dyn VectorStore> {
67 &self.vector_store
68 }
69
70 pub async fn create_collection(&self, name: &str) -> Result<()> {
79 let dimensions = self.embedding_provider.dimensions();
80 self.vector_store.create_collection(name, dimensions).await.map_err(|e| {
81 error!(collection = name, error = %e, "failed to create collection");
82 RagError::PipelineError(format!("failed to create collection '{name}': {e}"))
83 })
84 }
85
86 pub async fn delete_collection(&self, name: &str) -> Result<()> {
92 self.vector_store.delete_collection(name).await.map_err(|e| {
93 error!(collection = name, error = %e, "failed to delete collection");
94 RagError::PipelineError(format!("failed to delete collection '{name}': {e}"))
95 })
96 }
97
98 pub async fn ingest(&self, collection: &str, document: &Document) -> Result<Vec<Chunk>> {
107 let mut chunks = self.chunker.chunk(document);
109 if chunks.is_empty() {
110 info!(document.id = %document.id, chunk_count = 0, "ingested document (empty)");
111 return Ok(chunks);
112 }
113
114 let texts: Vec<&str> = chunks.iter().map(|c| c.text.as_str()).collect();
116
117 let embeddings = self.embedding_provider.embed_batch(&texts).await.map_err(|e| {
119 error!(document.id = %document.id, error = %e, "embedding failed during ingestion");
120 RagError::PipelineError(format!("embedding failed for document '{}': {e}", document.id))
121 })?;
122
123 for (chunk, embedding) in chunks.iter_mut().zip(embeddings) {
125 chunk.embedding = embedding;
126 }
127
128 self.vector_store.upsert(collection, &chunks).await.map_err(|e| {
130 error!(document.id = %document.id, error = %e, "upsert failed during ingestion");
131 RagError::PipelineError(format!("upsert failed for document '{}': {e}", document.id))
132 })?;
133
134 let chunk_count = chunks.len();
135 info!(document.id = %document.id, chunk_count, "ingested document");
136
137 Ok(chunks)
138 }
139
140 pub async fn ingest_batch(
149 &self,
150 collection: &str,
151 documents: &[Document],
152 ) -> Result<Vec<Chunk>> {
153 let mut all_chunks = Vec::new();
154 for document in documents {
155 let chunks = self.ingest(collection, document).await?;
156 all_chunks.extend(chunks);
157 }
158 Ok(all_chunks)
159 }
160
161 pub async fn query(&self, collection: &str, query: &str) -> Result<Vec<SearchResult>> {
170 let query_embedding = self.embedding_provider.embed(query).await.map_err(|e| {
172 error!(error = %e, "embedding failed during query");
173 RagError::PipelineError(format!("query embedding failed: {e}"))
174 })?;
175
176 let results = self
178 .vector_store
179 .search(collection, &query_embedding, self.config.top_k)
180 .await
181 .map_err(|e| {
182 error!(collection, error = %e, "vector store search failed");
183 RagError::PipelineError(format!("search failed in collection '{collection}': {e}"))
184 })?;
185
186 let results = if let Some(reranker) = &self.reranker {
188 reranker.rerank(query, results).await.map_err(|e| {
189 error!(error = %e, "reranking failed");
190 RagError::PipelineError(format!("reranking failed: {e}"))
191 })?
192 } else {
193 results
194 };
195
196 let threshold = self.config.similarity_threshold;
198 let filtered: Vec<SearchResult> =
199 results.into_iter().filter(|r| r.score >= threshold).collect();
200
201 info!(result_count = filtered.len(), "query completed");
202
203 Ok(filtered)
204 }
205}
206
207#[derive(Default)]
224pub struct RagPipelineBuilder {
225 config: Option<RagConfig>,
226 embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
227 vector_store: Option<Arc<dyn VectorStore>>,
228 chunker: Option<Arc<dyn Chunker>>,
229 reranker: Option<Arc<dyn Reranker>>,
230}
231
232impl RagPipelineBuilder {
233 pub fn config(mut self, config: RagConfig) -> Self {
235 self.config = Some(config);
236 self
237 }
238
239 pub fn embedding_provider(mut self, provider: Arc<dyn EmbeddingProvider>) -> Self {
241 self.embedding_provider = Some(provider);
242 self
243 }
244
245 pub fn vector_store(mut self, store: Arc<dyn VectorStore>) -> Self {
247 self.vector_store = Some(store);
248 self
249 }
250
251 pub fn chunker(mut self, chunker: Arc<dyn Chunker>) -> Self {
253 self.chunker = Some(chunker);
254 self
255 }
256
257 pub fn reranker(mut self, reranker: Arc<dyn Reranker>) -> Self {
259 self.reranker = Some(reranker);
260 self
261 }
262
263 pub fn build(self) -> Result<RagPipeline> {
269 let config =
270 self.config.ok_or_else(|| RagError::ConfigError("config is required".to_string()))?;
271 let embedding_provider = self
272 .embedding_provider
273 .ok_or_else(|| RagError::ConfigError("embedding_provider is required".to_string()))?;
274 let vector_store = self
275 .vector_store
276 .ok_or_else(|| RagError::ConfigError("vector_store is required".to_string()))?;
277 let chunker =
278 self.chunker.ok_or_else(|| RagError::ConfigError("chunker is required".to_string()))?;
279
280 Ok(RagPipeline {
281 config,
282 embedding_provider,
283 vector_store,
284 chunker,
285 reranker: self.reranker,
286 })
287 }
288}