1use anyhow::{Context, Result};
57use std::sync::Arc;
58use tracing::{debug, info, warn};
59
60use crate::embeddings::{EmbeddingProvider, EmbeddingProviderFactory};
61use crate::search_config::{BackendType, SearchConfig};
62use crate::vector_store::{
63 EmbeddedDocument, DocumentMetadata, FileVectorStore, Filter, InMemoryVectorStore, VectorStore,
64};
65
66#[cfg(feature = "ai-ingestion")]
67use crate::generation::{ExampleGenerator, GeneratorConfig, GenerationEvent, GeneratedExample, create_llm_provider};
68#[cfg(feature = "ai-ingestion")]
69use crate::skill_md::ToolDocumentation;
70#[cfg(feature = "ai-ingestion")]
71use futures_util::Stream;
72#[cfg(feature = "ai-ingestion")]
73use tokio_stream::StreamExt;
74
75#[cfg(feature = "qdrant")]
76use crate::vector_store::QdrantVectorStore;
77
78#[cfg(feature = "hybrid-search")]
79use super::{BM25Index, BM25Config};
80
81#[cfg(feature = "hybrid-search")]
82use tokio::sync::RwLock;
83
84#[cfg(feature = "reranker")]
85use super::{FastEmbedReranker, RerankerConfig as SearchRerankerConfig, Reranker, RerankDocument};
86
87#[cfg(feature = "context-compression")]
88use super::{ContextCompressor, CompressionConfig, CompressedToolContext};
89
90use super::{QueryProcessor, ProcessedQuery};
91
92#[derive(Debug, Clone)]
94pub struct PipelineSearchResult {
95 pub id: String,
97 pub content: String,
99 pub score: f32,
101 pub metadata: DocumentMetadata,
103 pub rerank_score: Option<f32>,
105}
106
107#[derive(Debug, Clone, Default)]
109pub struct PipelineIndexStats {
110 pub documents_added: usize,
112 pub documents_updated: usize,
114 pub total_documents: usize,
116 pub index_size_bytes: Option<usize>,
118}
119
120#[derive(Debug, Clone)]
122pub struct PipelineHealth {
123 pub healthy: bool,
125 pub embedding_provider: ProviderStatus,
127 pub vector_store: ProviderStatus,
129 pub bm25_index: Option<ProviderStatus>,
131 pub reranker: Option<ProviderStatus>,
133 pub example_generator: Option<ProviderStatus>,
135 pub indexed_documents: usize,
137}
138
139#[derive(Debug, Clone)]
141pub struct ProviderStatus {
142 pub name: String,
144 pub healthy: bool,
146 pub error: Option<String>,
148}
149
150#[derive(Debug, Clone)]
152pub struct IndexDocument {
153 pub id: String,
155 pub content: String,
157 pub metadata: DocumentMetadata,
159}
160
161pub struct SearchPipeline {
163 config: SearchConfig,
165 embedding_provider: Arc<dyn EmbeddingProvider>,
167 vector_store: Arc<dyn VectorStore>,
169 #[cfg(feature = "hybrid-search")]
171 bm25_index: Option<Arc<RwLock<BM25Index>>>,
172 #[cfg(feature = "reranker")]
174 reranker: Option<Arc<dyn Reranker>>,
175 #[cfg(feature = "context-compression")]
177 compressor: Option<ContextCompressor>,
178 #[cfg(feature = "ai-ingestion")]
180 example_generator: Option<Arc<ExampleGenerator>>,
181 query_processor: QueryProcessor,
183 known_skills: Vec<String>,
185 known_tools: Vec<String>,
187}
188
189impl SearchPipeline {
190 pub async fn from_config(config: SearchConfig) -> Result<Self> {
192 info!("Initializing SearchPipeline with config");
193
194 config.validate().context("Invalid search configuration")?;
196
197 let embedding_config = crate::embeddings::EmbeddingConfig {
199 provider: config.embedding.provider.parse()
200 .unwrap_or(crate::embeddings::EmbeddingProviderType::FastEmbed),
201 model: Some(config.embedding.model.clone()),
202 api_key: config.embedding.openai_api_key.clone(),
203 base_url: config.embedding.ollama_host.clone(),
204 batch_size: 100,
205 };
206
207 let embedding_provider = EmbeddingProviderFactory::create(&embedding_config)
208 .context("Failed to create embedding provider")?;
209
210 debug!(
211 "Created embedding provider: {} ({})",
212 embedding_provider.provider_name(),
213 embedding_provider.model_name()
214 );
215
216 let vector_store: Arc<dyn VectorStore> = match config.backend.backend_type {
218 BackendType::File => {
219 let file_config_from_search = config.file.as_ref();
220
221 let file_config = crate::vector_store::FileConfig {
222 storage_dir: file_config_from_search.and_then(|c| c.storage_path.clone()),
223 distance_metric: file_config_from_search
224 .map(|c| c.distance_metric)
225 .unwrap_or(crate::vector_store::DistanceMetric::Cosine),
226 };
227
228 Arc::new(
229 FileVectorStore::new(file_config)
230 .context("Failed to create File vector store")?
231 )
232 }
233 BackendType::InMemory => {
234 Arc::new(InMemoryVectorStore::with_dimensions(config.embedding.dimensions))
235 }
236 #[cfg(feature = "qdrant")]
237 BackendType::Qdrant => {
238 let qdrant_config = config.qdrant.as_ref()
239 .context("Qdrant config required for qdrant backend")?;
240
241 let qdrant_store = QdrantVectorStore::new(crate::vector_store::QdrantConfig {
242 url: qdrant_config.url.clone(),
243 api_key: qdrant_config.api_key.clone(),
244 collection_name: qdrant_config.collection.clone(),
245 vector_size: config.embedding.dimensions,
246 ..Default::default()
247 }).await.context("Failed to create Qdrant store")?;
248
249 Arc::new(qdrant_store)
250 }
251 #[cfg(not(feature = "qdrant"))]
252 BackendType::Qdrant => {
253 anyhow::bail!("Qdrant backend requires 'qdrant' feature to be enabled");
254 }
255 };
256
257 debug!("Created vector store: {}", vector_store.backend_name());
258
259 #[cfg(feature = "hybrid-search")]
261 let bm25_index = if config.retrieval.enable_hybrid {
262 let bm25_config = BM25Config::default();
263 let index = BM25Index::new(bm25_config)?;
264 Some(Arc::new(RwLock::new(index)))
265 } else {
266 None
267 };
268
269 #[cfg(feature = "reranker")]
271 let reranker: Option<Arc<dyn Reranker>> = if config.reranker.enabled {
272 let reranker_config = SearchRerankerConfig {
273 model: config.reranker.model.parse().unwrap_or_default(),
274 top_k: config.retrieval.final_k,
275 ..Default::default()
276 };
277 let fastembed_reranker = FastEmbedReranker::new(reranker_config)
278 .context("Failed to create reranker")?;
279 Some(Arc::new(fastembed_reranker))
280 } else {
281 None
282 };
283
284 #[cfg(feature = "context-compression")]
286 let compressor = {
287 let compression_config = CompressionConfig {
288 max_tokens_per_result: config.context.max_tokens_per_result,
289 max_total_tokens: config.context.max_total_tokens,
290 strategy: match config.context.compression {
291 crate::search_config::CompressionStrategy::Extractive => {
292 super::CompressionStrategy::Extractive
293 }
294 crate::search_config::CompressionStrategy::Template => {
295 super::CompressionStrategy::TemplateBased
296 }
297 crate::search_config::CompressionStrategy::Progressive => {
298 super::CompressionStrategy::Progressive
299 }
300 crate::search_config::CompressionStrategy::None => {
301 super::CompressionStrategy::None
302 }
303 },
304 ..Default::default()
305 };
306 Some(ContextCompressor::new(compression_config)?)
307 };
308
309 let query_processor = QueryProcessor::new();
311
312 #[cfg(feature = "ai-ingestion")]
314 let example_generator = if config.ai_ingestion.enabled {
315 match create_llm_provider(&config.ai_ingestion) {
316 Ok(llm) => {
317 let gen_config = GeneratorConfig::from(&config.ai_ingestion);
318 info!(
319 "AI example generation enabled: {} / {}",
320 llm.name(),
321 llm.model()
322 );
323 Some(Arc::new(ExampleGenerator::new(llm, gen_config)))
324 }
325 Err(e) => {
326 warn!("Failed to create LLM provider for AI ingestion: {}", e);
327 None
328 }
329 }
330 } else {
331 None
332 };
333
334 Ok(Self {
335 config,
336 embedding_provider,
337 vector_store,
338 #[cfg(feature = "hybrid-search")]
339 bm25_index,
340 #[cfg(feature = "reranker")]
341 reranker,
342 #[cfg(feature = "context-compression")]
343 compressor,
344 #[cfg(feature = "ai-ingestion")]
345 example_generator,
346 query_processor,
347 known_skills: Vec::new(),
348 known_tools: Vec::new(),
349 })
350 }
351
352 pub async fn default_pipeline() -> Result<Self> {
354 Self::from_config(SearchConfig::default()).await
355 }
356
357 pub async fn index_documents(&self, documents: Vec<IndexDocument>) -> Result<PipelineIndexStats> {
362 if documents.is_empty() {
363 return Ok(PipelineIndexStats::default());
364 }
365
366 info!("Indexing {} documents", documents.len());
367
368 let texts: Vec<String> = documents.iter().map(|d| d.content.clone()).collect();
370
371 let embeddings = self.embedding_provider
373 .embed_documents_batched(texts)
374 .await
375 .context("Failed to generate embeddings")?;
376
377 let embedded_docs: Vec<EmbeddedDocument> = documents
379 .into_iter()
380 .zip(embeddings)
381 .map(|(doc, embedding)| EmbeddedDocument {
382 id: doc.id,
383 content: Some(doc.content),
384 embedding,
385 metadata: doc.metadata,
386 })
387 .collect();
388
389 let _doc_count = embedded_docs.len();
390
391 #[cfg(feature = "hybrid-search")]
393 if let Some(ref bm25) = self.bm25_index {
394 let mut bm25_guard = bm25.write().await;
395 for doc in &embedded_docs {
396 if let Some(ref content) = doc.content {
397 bm25_guard.add_document(&doc.id, content)?;
398 }
399 }
400 debug!("Added {} documents to BM25 index", doc_count);
401 }
402
403 let stats = self.vector_store.upsert(embedded_docs).await
405 .context("Failed to upsert to vector store")?;
406
407 let total = self.vector_store.count(None).await.unwrap_or(0);
408
409 Ok(PipelineIndexStats {
410 documents_added: stats.inserted,
411 documents_updated: stats.updated,
412 total_documents: total,
413 index_size_bytes: None,
414 })
415 }
416
417 #[cfg(feature = "ai-ingestion")]
422 pub async fn index_documents_with_generation(
423 &self,
424 documents: Vec<IndexDocument>,
425 tools: Vec<ToolDocumentation>,
426 ) -> Result<(PipelineIndexStats, Vec<GeneratedExample>)> {
427 if let Some(ref generator) = self.example_generator {
428 let enhanced = self.enhance_documents_with_examples(documents, &tools, generator).await?;
429 let all_examples = enhanced.1;
430 let stats = self.index_documents(enhanced.0).await?;
431 Ok((stats, all_examples))
432 } else {
433 let stats = self.index_documents(documents).await?;
434 Ok((stats, Vec::new()))
435 }
436 }
437
438 #[cfg(feature = "ai-ingestion")]
443 pub fn index_documents_stream<'a>(
444 &'a self,
445 documents: Vec<IndexDocument>,
446 tools: Vec<ToolDocumentation>,
447 ) -> impl Stream<Item = GenerationEvent> + 'a {
448 async_stream::stream! {
449 if let Some(ref generator) = self.example_generator {
450 let total_tools = tools.len();
451 let mut all_examples = Vec::new();
452
453 for (idx, tool) in tools.iter().enumerate() {
455 let mut stream = Box::pin(generator.generate_stream(tool, idx + 1, total_tools));
456
457 while let Some(event) = stream.next().await {
458 if let GenerationEvent::Example { ref example } = event {
460 all_examples.push(example.clone());
461 }
462 yield event;
463 }
464 }
465
466 let enhanced_docs = self.enhance_documents_inline(&documents, &all_examples);
468
469 match self.index_documents(enhanced_docs).await {
471 Ok(stats) => {
472 yield GenerationEvent::Completed {
473 total_examples: all_examples.len(),
474 total_valid: all_examples.iter().filter(|e| e.validated).count(),
475 total_tools,
476 duration_ms: 0, };
478 info!(
479 "Indexed {} documents with {} generated examples",
480 stats.total_documents, all_examples.len()
481 );
482 }
483 Err(e) => {
484 yield GenerationEvent::Error {
485 message: format!("Failed to index documents: {}", e),
486 recoverable: false,
487 tool_name: None,
488 };
489 }
490 }
491 } else {
492 match self.index_documents(documents).await {
494 Ok(stats) => {
495 yield GenerationEvent::Completed {
496 total_examples: 0,
497 total_valid: 0,
498 total_tools: tools.len(),
499 duration_ms: 0,
500 };
501 info!("Indexed {} documents (no AI generation)", stats.total_documents);
502 }
503 Err(e) => {
504 yield GenerationEvent::Error {
505 message: format!("Failed to index documents: {}", e),
506 recoverable: false,
507 tool_name: None,
508 };
509 }
510 }
511 }
512 }
513 }
514
515 #[cfg(feature = "ai-ingestion")]
517 async fn enhance_documents_with_examples(
518 &self,
519 documents: Vec<IndexDocument>,
520 tools: &[ToolDocumentation],
521 generator: &ExampleGenerator,
522 ) -> Result<(Vec<IndexDocument>, Vec<GeneratedExample>)> {
523 let mut all_examples = Vec::new();
524
525 for tool in tools {
527 match generator.generate(tool).await {
528 Ok(examples) => {
529 info!(
530 "Generated {} examples for tool '{}'",
531 examples.len(), tool.name
532 );
533 all_examples.extend(examples);
534 }
535 Err(e) => {
536 warn!("Failed to generate examples for '{}': {}", tool.name, e);
537 }
538 }
539 }
540
541 let enhanced = self.enhance_documents_inline(&documents, &all_examples);
543
544 Ok((enhanced, all_examples))
545 }
546
547 #[cfg(feature = "ai-ingestion")]
549 fn enhance_documents_inline(
550 &self,
551 documents: &[IndexDocument],
552 examples: &[GeneratedExample],
553 ) -> Vec<IndexDocument> {
554 if examples.is_empty() {
555 return documents.to_vec();
556 }
557
558 let example_text = Self::format_examples_for_embedding(examples);
560
561 documents
563 .iter()
564 .map(|doc| {
565 let enhanced_content = format!(
568 "{}\n\n## Generated Examples\n\n{}",
569 doc.content, example_text
570 );
571
572 IndexDocument {
573 id: doc.id.clone(),
574 content: enhanced_content,
575 metadata: doc.metadata.clone(),
576 }
577 })
578 .collect()
579 }
580
581 #[cfg(feature = "ai-ingestion")]
583 fn format_examples_for_embedding(examples: &[GeneratedExample]) -> String {
584 examples
585 .iter()
586 .map(|e| {
587 format!(
588 "Example: {}\n{}",
589 e.command,
590 e.explanation
591 )
592 })
593 .collect::<Vec<_>>()
594 .join("\n\n")
595 }
596
597 #[cfg(feature = "ai-ingestion")]
599 pub fn has_example_generator(&self) -> bool {
600 self.example_generator.is_some()
601 }
602
603 #[cfg(feature = "ai-ingestion")]
605 pub fn example_generator_info(&self) -> Option<(&str, &str)> {
606 self.example_generator.as_ref().map(|g| {
607 (g.provider_name(), g.model_name())
608 })
609 }
610
611 pub async fn search(&self, query: &str, top_k: usize) -> Result<Vec<PipelineSearchResult>> {
620 debug!("Searching for: {} (top_k={})", query, top_k);
621
622 let processed = self.query_processor.process(query);
624 let search_query = if !processed.normalized.is_empty() {
625 &processed.normalized
626 } else {
627 query
628 };
629
630 debug!(
631 "Query processed: intent={:?}, confidence={:.2}",
632 processed.intent, processed.intent_confidence
633 );
634
635 let query_embedding = self.embedding_provider
637 .embed_query(search_query)
638 .await
639 .context("Failed to embed query")?;
640
641 let first_stage_k = self.config.retrieval.first_stage_k.max(top_k * 2);
643
644 let candidates = self.retrieve_candidates(&query_embedding, search_query, first_stage_k).await?;
646
647 if candidates.is_empty() {
648 return Ok(Vec::new());
649 }
650
651 #[cfg(feature = "reranker")]
653 let reranked = if let Some(ref reranker) = self.reranker {
654 self.rerank_results(reranker.as_ref(), query, candidates, top_k).await?
655 } else {
656 candidates.into_iter().take(top_k).collect()
657 };
658
659 #[cfg(not(feature = "reranker"))]
660 let reranked: Vec<PipelineSearchResult> = candidates.into_iter().take(top_k).collect();
661
662 Ok(reranked)
663 }
664
665 pub async fn search_with_filter(
667 &self,
668 query: &str,
669 filter: Filter,
670 top_k: usize,
671 ) -> Result<Vec<PipelineSearchResult>> {
672 let query_embedding = self.embedding_provider
673 .embed_query(query)
674 .await
675 .context("Failed to embed query")?;
676
677 let results = self.vector_store
678 .search(query_embedding, Some(filter), top_k)
679 .await
680 .context("Vector search failed")?;
681
682 Ok(results
683 .into_iter()
684 .map(|r| PipelineSearchResult {
685 id: r.id,
686 content: r.content.unwrap_or_default(),
687 score: r.score,
688 metadata: r.metadata,
689 rerank_score: None,
690 })
691 .collect())
692 }
693
694 async fn retrieve_candidates(
696 &self,
697 query_embedding: &[f32],
698 _query_text: &str,
699 k: usize,
700 ) -> Result<Vec<PipelineSearchResult>> {
701 #[cfg(feature = "hybrid-search")]
702 if self.config.retrieval.enable_hybrid {
703 if let Some(ref bm25) = self.bm25_index {
704 return self.hybrid_retrieve(query_embedding, query_text, bm25, k).await;
705 }
706 }
707
708 let results = self.vector_store
710 .search(query_embedding.to_vec(), None, k)
711 .await
712 .context("Vector search failed")?;
713
714 Ok(results
715 .into_iter()
716 .map(|r| PipelineSearchResult {
717 id: r.id,
718 content: r.content.unwrap_or_default(),
719 score: r.score,
720 metadata: r.metadata,
721 rerank_score: None,
722 })
723 .collect())
724 }
725
726 #[cfg(feature = "hybrid-search")]
728 async fn hybrid_retrieve(
729 &self,
730 query_embedding: &[f32],
731 query_text: &str,
732 bm25: &Arc<RwLock<BM25Index>>,
733 k: usize,
734 ) -> Result<Vec<PipelineSearchResult>> {
735 use super::reciprocal_rank_fusion;
736
737 let dense_results = self.vector_store
739 .search(query_embedding.to_vec(), None, k)
740 .await
741 .context("Dense search failed")?;
742
743 let bm25_guard = bm25.read().await;
745 let sparse_results = bm25_guard.search(query_text, k)?;
746
747 let dense_scores: Vec<(String, f32)> = dense_results
749 .iter()
750 .map(|r| (r.id.clone(), r.score))
751 .collect();
752
753 let sparse_scores: Vec<(String, f32)> = sparse_results
754 .iter()
755 .map(|r| (r.doc_id.clone(), r.score))
756 .collect();
757
758 let rrf_k = self.config.retrieval.rrf_k;
760 let fused = reciprocal_rank_fusion(
761 vec![dense_scores, sparse_scores],
762 rrf_k,
763 );
764
765 let mut results: Vec<PipelineSearchResult> = Vec::with_capacity(k);
767
768 for (id, score) in fused.into_iter().take(k) {
769 if let Some(dense_match) = dense_results.iter().find(|r| r.id == id) {
771 results.push(PipelineSearchResult {
772 id: dense_match.id.clone(),
773 content: dense_match.content.clone().unwrap_or_default(),
774 score,
775 metadata: dense_match.metadata.clone(),
776 rerank_score: None,
777 });
778 } else if let Some(_sparse_match) = sparse_results.iter().find(|r| r.doc_id == id) {
779 if let Ok(docs) = self.vector_store.get(vec![id.clone()]).await {
781 if let Some(doc) = docs.into_iter().next() {
782 results.push(PipelineSearchResult {
783 id: doc.id,
784 content: doc.content.unwrap_or_default(),
785 score,
786 metadata: doc.metadata,
787 rerank_score: None,
788 });
789 }
790 }
791 }
792 }
793
794 Ok(results)
795 }
796
797 #[cfg(feature = "reranker")]
799 async fn rerank_results(
800 &self,
801 reranker: &dyn Reranker,
802 query: &str,
803 candidates: Vec<PipelineSearchResult>,
804 top_k: usize,
805 ) -> Result<Vec<PipelineSearchResult>> {
806 if candidates.is_empty() {
807 return Ok(candidates);
808 }
809
810 let rerank_docs: Vec<RerankDocument> = candidates
811 .iter()
812 .map(|r| RerankDocument {
813 id: r.id.clone(),
814 text: r.content.clone(),
815 })
816 .collect();
817
818 let reranked = reranker.rerank(query, rerank_docs, top_k)?;
819
820 let results: Vec<PipelineSearchResult> = reranked
822 .into_iter()
823 .filter_map(|rr| {
824 candidates.iter().find(|c| c.id == rr.document.id).map(|c| {
825 PipelineSearchResult {
826 id: c.id.clone(),
827 content: c.content.clone(),
828 score: c.score,
829 metadata: c.metadata.clone(),
830 rerank_score: Some(rr.score),
831 }
832 })
833 })
834 .collect();
835
836 Ok(results)
837 }
838
839 #[cfg(feature = "context-compression")]
841 pub fn compress_results(
842 &self,
843 results: &[PipelineSearchResult],
844 ) -> Result<Vec<CompressedToolContext>> {
845 let compressor = self.compressor.as_ref()
846 .context("Context compression not enabled")?;
847
848 let tools: Vec<_> = results
849 .iter()
850 .map(|r| {
851 super::CompressedToolContext {
853 name: r.metadata.tool_name.clone().unwrap_or_else(|| r.id.clone()),
854 description: r.content.clone(),
855 parameters: Vec::new(),
856 example: None,
857 score: r.rerank_score.unwrap_or(r.score),
858 }
859 })
860 .collect();
861
862 Ok(tools)
863 }
864
865 pub async fn health_check(&self) -> PipelineHealth {
867 let mut healthy = true;
868
869 let embedding_status = match self.embedding_provider.health_check().await {
871 Ok(true) => ProviderStatus {
872 name: self.embedding_provider.provider_name().to_string(),
873 healthy: true,
874 error: None,
875 },
876 Ok(false) => {
877 healthy = false;
878 ProviderStatus {
879 name: self.embedding_provider.provider_name().to_string(),
880 healthy: false,
881 error: Some("Provider reported unhealthy".to_string()),
882 }
883 }
884 Err(e) => {
885 healthy = false;
886 ProviderStatus {
887 name: self.embedding_provider.provider_name().to_string(),
888 healthy: false,
889 error: Some(e.to_string()),
890 }
891 }
892 };
893
894 let vector_status = match self.vector_store.health_check().await {
896 Ok(status) => ProviderStatus {
897 name: self.vector_store.backend_name().to_string(),
898 healthy: status.healthy,
899 error: if status.healthy { None } else { Some("Unhealthy".to_string()) },
900 },
901 Err(e) => {
902 healthy = false;
903 ProviderStatus {
904 name: self.vector_store.backend_name().to_string(),
905 healthy: false,
906 error: Some(e.to_string()),
907 }
908 }
909 };
910
911 #[cfg(feature = "hybrid-search")]
913 let bm25_status = if self.bm25_index.is_some() {
914 Some(ProviderStatus {
915 name: "BM25 (Tantivy)".to_string(),
916 healthy: true,
917 error: None,
918 })
919 } else {
920 None
921 };
922 #[cfg(not(feature = "hybrid-search"))]
923 let bm25_status: Option<ProviderStatus> = None;
924
925 #[cfg(feature = "reranker")]
927 let reranker_status = if let Some(ref reranker) = self.reranker {
928 Some(ProviderStatus {
929 name: reranker.model_name().to_string(),
930 healthy: true,
931 error: None,
932 })
933 } else {
934 None
935 };
936 #[cfg(not(feature = "reranker"))]
937 let reranker_status: Option<ProviderStatus> = None;
938
939 #[cfg(feature = "ai-ingestion")]
941 let generator_status = if let Some(ref generator) = self.example_generator {
942 Some(ProviderStatus {
943 name: format!("{}/{}", generator.provider_name(), generator.model_name()),
944 healthy: true,
945 error: None,
946 })
947 } else {
948 None
949 };
950 #[cfg(not(feature = "ai-ingestion"))]
951 let generator_status: Option<ProviderStatus> = None;
952
953 let indexed = self.vector_store.count(None).await.unwrap_or(0);
954
955 PipelineHealth {
956 healthy,
957 embedding_provider: embedding_status,
958 vector_store: vector_status,
959 bm25_index: bm25_status,
960 reranker: reranker_status,
961 example_generator: generator_status,
962 indexed_documents: indexed,
963 }
964 }
965
966 pub async fn document_count(&self) -> Result<usize> {
968 self.vector_store.count(None).await
969 }
970
971 pub async fn clear(&self) -> Result<()> {
973 warn!("Clear not fully implemented - documents may persist");
976 Ok(())
977 }
978
979 pub fn config(&self) -> &SearchConfig {
981 &self.config
982 }
983
984 pub fn embedding_info(&self) -> (&str, &str, usize) {
986 (
987 self.embedding_provider.provider_name(),
988 self.embedding_provider.model_name(),
989 self.embedding_provider.dimensions(),
990 )
991 }
992
993 pub fn add_known_skill(&mut self, skill_name: &str) {
995 self.known_skills.push(skill_name.to_string());
996 self.rebuild_query_processor();
997 }
998
999 pub fn add_known_tool(&mut self, tool_name: &str) {
1001 self.known_tools.push(tool_name.to_string());
1002 self.rebuild_query_processor();
1003 }
1004
1005 pub fn add_known_skills(&mut self, skills: impl IntoIterator<Item = impl Into<String>>) {
1007 for skill in skills {
1008 self.known_skills.push(skill.into());
1009 }
1010 self.rebuild_query_processor();
1011 }
1012
1013 pub fn add_known_tools(&mut self, tools: impl IntoIterator<Item = impl Into<String>>) {
1015 for tool in tools {
1016 self.known_tools.push(tool.into());
1017 }
1018 self.rebuild_query_processor();
1019 }
1020
1021 fn rebuild_query_processor(&mut self) {
1023 self.query_processor = QueryProcessor::new()
1024 .with_skills(self.known_skills.iter().cloned())
1025 .with_tools(self.known_tools.iter().cloned());
1026 }
1027
1028 pub fn process_query(&self, query: &str) -> ProcessedQuery {
1030 self.query_processor.process(query)
1031 }
1032}
1033
1034#[cfg(test)]
1035mod tests {
1036 use super::*;
1037
1038 #[tokio::test]
1039 async fn test_pipeline_creation() {
1040 let config = SearchConfig::default();
1041 let pipeline = SearchPipeline::from_config(config).await;
1042 assert!(pipeline.is_ok());
1043 }
1044
1045 #[tokio::test]
1046 async fn test_pipeline_index_and_search() {
1047 let config = SearchConfig::default();
1048 let pipeline = SearchPipeline::from_config(config).await.unwrap();
1049
1050 let docs = vec![
1052 IndexDocument {
1053 id: "1".to_string(),
1054 content: "List all Kubernetes pods in the cluster".to_string(),
1055 metadata: DocumentMetadata {
1056 skill_name: Some("kubernetes".to_string()),
1057 tool_name: Some("list-pods".to_string()),
1058 ..Default::default()
1059 },
1060 },
1061 IndexDocument {
1062 id: "2".to_string(),
1063 content: "Deploy a new application to Kubernetes".to_string(),
1064 metadata: DocumentMetadata {
1065 skill_name: Some("kubernetes".to_string()),
1066 tool_name: Some("deploy".to_string()),
1067 ..Default::default()
1068 },
1069 },
1070 IndexDocument {
1071 id: "3".to_string(),
1072 content: "Create an S3 bucket in AWS".to_string(),
1073 metadata: DocumentMetadata {
1074 skill_name: Some("aws".to_string()),
1075 tool_name: Some("create-bucket".to_string()),
1076 ..Default::default()
1077 },
1078 },
1079 ];
1080
1081 let stats = pipeline.index_documents(docs).await.unwrap();
1082 assert_eq!(stats.documents_added, 3);
1083 assert_eq!(stats.total_documents, 3);
1084
1085 let results = pipeline.search("kubernetes pods", 2).await.unwrap();
1087 assert!(!results.is_empty());
1088 assert!(results.len() <= 2);
1089
1090 assert!(results[0].content.to_lowercase().contains("kubernetes"));
1092 }
1093
1094 #[tokio::test]
1095 async fn test_pipeline_health_check() {
1096 let config = SearchConfig::default();
1097 let pipeline = SearchPipeline::from_config(config).await.unwrap();
1098
1099 let health = pipeline.health_check().await;
1100 assert!(health.healthy);
1101 assert!(health.embedding_provider.healthy);
1102 assert!(health.vector_store.healthy);
1103 }
1104
1105 #[tokio::test]
1106 async fn test_query_processing() {
1107 let config = SearchConfig::default();
1108 let mut pipeline = SearchPipeline::from_config(config).await.unwrap();
1109
1110 pipeline.add_known_skill("kubernetes");
1111 pipeline.add_known_tool("list-pods");
1112
1113 let processed = pipeline.process_query("how do I list k8s pods?");
1114 assert!(!processed.normalized.is_empty());
1115 }
1116
1117 #[tokio::test]
1118 async fn test_empty_search() {
1119 let config = SearchConfig::default();
1120 let pipeline = SearchPipeline::from_config(config).await.unwrap();
1121
1122 let results = pipeline.search("kubernetes", 5).await.unwrap();
1124 assert!(results.is_empty());
1125 }
1126
1127 #[tokio::test]
1128 async fn test_search_with_filter() {
1129 let config = SearchConfig::default();
1130 let pipeline = SearchPipeline::from_config(config).await.unwrap();
1131
1132 let docs = vec![
1134 IndexDocument {
1135 id: "1".to_string(),
1136 content: "Kubernetes pods".to_string(),
1137 metadata: DocumentMetadata {
1138 skill_name: Some("kubernetes".to_string()),
1139 ..Default::default()
1140 },
1141 },
1142 IndexDocument {
1143 id: "2".to_string(),
1144 content: "AWS S3 bucket".to_string(),
1145 metadata: DocumentMetadata {
1146 skill_name: Some("aws".to_string()),
1147 ..Default::default()
1148 },
1149 },
1150 ];
1151 pipeline.index_documents(docs).await.unwrap();
1152
1153 let filter = Filter::new().skill("kubernetes");
1155 let results = pipeline.search_with_filter("bucket", filter, 5).await.unwrap();
1156
1157 for result in &results {
1159 if let Some(ref skill) = result.metadata.skill_name {
1160 assert_eq!(skill, "kubernetes");
1161 }
1162 }
1163 }
1164}