skill_runtime/search/
pipeline.rs

1//! SearchPipeline orchestrator for end-to-end RAG search
2//!
3//! Provides a unified interface for semantic search that orchestrates:
4//! - Embedding generation (FastEmbed, OpenAI, Ollama)
5//! - Vector storage (InMemory, Qdrant)
6//! - Hybrid retrieval (dense + BM25)
7//! - Cross-encoder reranking
8//! - Context compression
9//! - Query understanding
10//!
11//! # Architecture
12//!
13//! The pipeline is designed to be **stateless per CLI invocation** but benefits from
14//! filesystem caches for fast subsequent runs:
15//!
16//! ```text
17//! ┌─────────────────────────────────────────────────────────────────────────┐
18//! │                          SearchPipeline                                  │
19//! │                                                                          │
20//! │  ┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐    │
21//! │  │   Embedding     │     │   Vector Store  │     │   BM25 Index    │    │
22//! │  │   Provider      │     │   (InMemory/    │     │   (Tantivy)     │    │
23//! │  │   (FastEmbed)   │     │    Qdrant)      │     │   [optional]    │    │
24//! │  └────────┬────────┘     └────────┬────────┘     └────────┬────────┘    │
25//! │           │                       │                       │             │
26//! │           ▼                       ▼                       ▼             │
27//! │  ┌─────────────────────────────────────────────────────────────────┐    │
28//! │  │                     Filesystem Caches                            │    │
29//! │  │  ~/.fastembed_cache/   ~/.skill-engine/index/   ~/.skill-engine/ │    │
30//! │  │  (model weights)       (index metadata)         (search.toml)    │    │
31//! │  └─────────────────────────────────────────────────────────────────┘    │
32//! │                                                                          │
33//! └─────────────────────────────────────────────────────────────────────────┘
34//! ```
35//!
36//! **CLI Mode**: Pipeline is created per-invocation, indexes on-demand, benefits from caches.
37//! **MCP Mode**: Pipeline is created once at server startup, kept in memory.
38//!
39//! # Example
40//!
41//! ```ignore
42//! use skill_runtime::search::{SearchPipeline, SearchConfig};
43//!
44//! // Create pipeline from config
45//! let config = SearchConfig::default();
46//! let mut pipeline = SearchPipeline::from_config(config).await?;
47//!
48//! // Index skills
49//! let skills = load_skills().await?;
50//! pipeline.index_documents(skills).await?;
51//!
52//! // Search
53//! let results = pipeline.search("deploy kubernetes pods", 5).await?;
54//! ```
55
56use anyhow::{Context, Result};
57use std::sync::Arc;
58use tracing::{debug, info, warn};
59
60use crate::embeddings::{EmbeddingProvider, EmbeddingProviderFactory};
61use crate::search_config::{BackendType, SearchConfig};
62use crate::vector_store::{
63    EmbeddedDocument, DocumentMetadata, FileVectorStore, Filter, InMemoryVectorStore, VectorStore,
64};
65
66#[cfg(feature = "ai-ingestion")]
67use crate::generation::{ExampleGenerator, GeneratorConfig, GenerationEvent, GeneratedExample, create_llm_provider};
68#[cfg(feature = "ai-ingestion")]
69use crate::skill_md::ToolDocumentation;
70#[cfg(feature = "ai-ingestion")]
71use futures_util::Stream;
72#[cfg(feature = "ai-ingestion")]
73use tokio_stream::StreamExt;
74
75#[cfg(feature = "qdrant")]
76use crate::vector_store::QdrantVectorStore;
77
78#[cfg(feature = "hybrid-search")]
79use super::{BM25Index, BM25Config};
80
81#[cfg(feature = "hybrid-search")]
82use tokio::sync::RwLock;
83
84#[cfg(feature = "reranker")]
85use super::{FastEmbedReranker, RerankerConfig as SearchRerankerConfig, Reranker, RerankDocument};
86
87#[cfg(feature = "context-compression")]
88use super::{ContextCompressor, CompressionConfig, CompressedToolContext};
89
90use super::{QueryProcessor, ProcessedQuery};
91
92/// Result from a search operation
93#[derive(Debug, Clone)]
94pub struct PipelineSearchResult {
95    /// Document ID
96    pub id: String,
97    /// Original content
98    pub content: String,
99    /// Relevance score (0.0 - 1.0)
100    pub score: f32,
101    /// Document metadata
102    pub metadata: DocumentMetadata,
103    /// Reranker score (if reranking was performed)
104    pub rerank_score: Option<f32>,
105}
106
107/// Statistics about indexed documents
108#[derive(Debug, Clone, Default)]
109pub struct PipelineIndexStats {
110    /// Number of documents added
111    pub documents_added: usize,
112    /// Number of documents updated
113    pub documents_updated: usize,
114    /// Total documents in index
115    pub total_documents: usize,
116    /// Index size in bytes (approximate)
117    pub index_size_bytes: Option<usize>,
118}
119
120/// Health status of the pipeline
121#[derive(Debug, Clone)]
122pub struct PipelineHealth {
123    /// Overall health status
124    pub healthy: bool,
125    /// Embedding provider status
126    pub embedding_provider: ProviderStatus,
127    /// Vector store status
128    pub vector_store: ProviderStatus,
129    /// BM25 index status (if enabled)
130    pub bm25_index: Option<ProviderStatus>,
131    /// Reranker status (if enabled)
132    pub reranker: Option<ProviderStatus>,
133    /// Example generator status (if AI ingestion enabled)
134    pub example_generator: Option<ProviderStatus>,
135    /// Number of indexed documents
136    pub indexed_documents: usize,
137}
138
139/// Status of an individual component
140#[derive(Debug, Clone)]
141pub struct ProviderStatus {
142    /// Component name
143    pub name: String,
144    /// Whether the component is healthy
145    pub healthy: bool,
146    /// Optional error message if unhealthy
147    pub error: Option<String>,
148}
149
150/// Document to be indexed
151#[derive(Debug, Clone)]
152pub struct IndexDocument {
153    /// Unique document ID
154    pub id: String,
155    /// Text content to embed and index
156    pub content: String,
157    /// Optional metadata
158    pub metadata: DocumentMetadata,
159}
160
161/// Unified search pipeline that orchestrates all RAG components
162pub struct SearchPipeline {
163    /// Configuration
164    config: SearchConfig,
165    /// Embedding provider
166    embedding_provider: Arc<dyn EmbeddingProvider>,
167    /// Vector store
168    vector_store: Arc<dyn VectorStore>,
169    /// BM25 index for hybrid search
170    #[cfg(feature = "hybrid-search")]
171    bm25_index: Option<Arc<RwLock<BM25Index>>>,
172    /// Cross-encoder reranker
173    #[cfg(feature = "reranker")]
174    reranker: Option<Arc<dyn Reranker>>,
175    /// Context compressor
176    #[cfg(feature = "context-compression")]
177    compressor: Option<ContextCompressor>,
178    /// AI example generator
179    #[cfg(feature = "ai-ingestion")]
180    example_generator: Option<Arc<ExampleGenerator>>,
181    /// Query processor
182    query_processor: QueryProcessor,
183    /// Known skills for query processing
184    known_skills: Vec<String>,
185    /// Known tools for query processing
186    known_tools: Vec<String>,
187}
188
189impl SearchPipeline {
190    /// Create a new search pipeline from configuration
191    pub async fn from_config(config: SearchConfig) -> Result<Self> {
192        info!("Initializing SearchPipeline with config");
193
194        // Validate configuration
195        config.validate().context("Invalid search configuration")?;
196
197        // Create embedding provider
198        let embedding_config = crate::embeddings::EmbeddingConfig {
199            provider: config.embedding.provider.parse()
200                .unwrap_or(crate::embeddings::EmbeddingProviderType::FastEmbed),
201            model: Some(config.embedding.model.clone()),
202            api_key: config.embedding.openai_api_key.clone(),
203            base_url: config.embedding.ollama_host.clone(),
204            batch_size: 100,
205        };
206
207        let embedding_provider = EmbeddingProviderFactory::create(&embedding_config)
208            .context("Failed to create embedding provider")?;
209
210        debug!(
211            "Created embedding provider: {} ({})",
212            embedding_provider.provider_name(),
213            embedding_provider.model_name()
214        );
215
216        // Create vector store
217        let vector_store: Arc<dyn VectorStore> = match config.backend.backend_type {
218            BackendType::File => {
219                let file_config_from_search = config.file.as_ref();
220
221                let file_config = crate::vector_store::FileConfig {
222                    storage_dir: file_config_from_search.and_then(|c| c.storage_path.clone()),
223                    distance_metric: file_config_from_search
224                        .map(|c| c.distance_metric)
225                        .unwrap_or(crate::vector_store::DistanceMetric::Cosine),
226                };
227
228                Arc::new(
229                    FileVectorStore::new(file_config)
230                        .context("Failed to create File vector store")?
231                )
232            }
233            BackendType::InMemory => {
234                Arc::new(InMemoryVectorStore::with_dimensions(config.embedding.dimensions))
235            }
236            #[cfg(feature = "qdrant")]
237            BackendType::Qdrant => {
238                let qdrant_config = config.qdrant.as_ref()
239                    .context("Qdrant config required for qdrant backend")?;
240
241                let qdrant_store = QdrantVectorStore::new(crate::vector_store::QdrantConfig {
242                    url: qdrant_config.url.clone(),
243                    api_key: qdrant_config.api_key.clone(),
244                    collection_name: qdrant_config.collection.clone(),
245                    vector_size: config.embedding.dimensions,
246                    ..Default::default()
247                }).await.context("Failed to create Qdrant store")?;
248
249                Arc::new(qdrant_store)
250            }
251            #[cfg(not(feature = "qdrant"))]
252            BackendType::Qdrant => {
253                anyhow::bail!("Qdrant backend requires 'qdrant' feature to be enabled");
254            }
255        };
256
257        debug!("Created vector store: {}", vector_store.backend_name());
258
259        // Create BM25 index if hybrid search is enabled
260        #[cfg(feature = "hybrid-search")]
261        let bm25_index = if config.retrieval.enable_hybrid {
262            let bm25_config = BM25Config::default();
263            let index = BM25Index::new(bm25_config)?;
264            Some(Arc::new(RwLock::new(index)))
265        } else {
266            None
267        };
268
269        // Create reranker if enabled
270        #[cfg(feature = "reranker")]
271        let reranker: Option<Arc<dyn Reranker>> = if config.reranker.enabled {
272            let reranker_config = SearchRerankerConfig {
273                model: config.reranker.model.parse().unwrap_or_default(),
274                top_k: config.retrieval.final_k,
275                ..Default::default()
276            };
277            let fastembed_reranker = FastEmbedReranker::new(reranker_config)
278                .context("Failed to create reranker")?;
279            Some(Arc::new(fastembed_reranker))
280        } else {
281            None
282        };
283
284        // Create context compressor if enabled
285        #[cfg(feature = "context-compression")]
286        let compressor = {
287            let compression_config = CompressionConfig {
288                max_tokens_per_result: config.context.max_tokens_per_result,
289                max_total_tokens: config.context.max_total_tokens,
290                strategy: match config.context.compression {
291                    crate::search_config::CompressionStrategy::Extractive => {
292                        super::CompressionStrategy::Extractive
293                    }
294                    crate::search_config::CompressionStrategy::Template => {
295                        super::CompressionStrategy::TemplateBased
296                    }
297                    crate::search_config::CompressionStrategy::Progressive => {
298                        super::CompressionStrategy::Progressive
299                    }
300                    crate::search_config::CompressionStrategy::None => {
301                        super::CompressionStrategy::None
302                    }
303                },
304                ..Default::default()
305            };
306            Some(ContextCompressor::new(compression_config)?)
307        };
308
309        // Create query processor
310        let query_processor = QueryProcessor::new();
311
312        // Create example generator if AI ingestion is enabled
313        #[cfg(feature = "ai-ingestion")]
314        let example_generator = if config.ai_ingestion.enabled {
315            match create_llm_provider(&config.ai_ingestion) {
316                Ok(llm) => {
317                    let gen_config = GeneratorConfig::from(&config.ai_ingestion);
318                    info!(
319                        "AI example generation enabled: {} / {}",
320                        llm.name(),
321                        llm.model()
322                    );
323                    Some(Arc::new(ExampleGenerator::new(llm, gen_config)))
324                }
325                Err(e) => {
326                    warn!("Failed to create LLM provider for AI ingestion: {}", e);
327                    None
328                }
329            }
330        } else {
331            None
332        };
333
334        Ok(Self {
335            config,
336            embedding_provider,
337            vector_store,
338            #[cfg(feature = "hybrid-search")]
339            bm25_index,
340            #[cfg(feature = "reranker")]
341            reranker,
342            #[cfg(feature = "context-compression")]
343            compressor,
344            #[cfg(feature = "ai-ingestion")]
345            example_generator,
346            query_processor,
347            known_skills: Vec::new(),
348            known_tools: Vec::new(),
349        })
350    }
351
352    /// Create a pipeline with default configuration (FastEmbed, InMemory)
353    pub async fn default_pipeline() -> Result<Self> {
354        Self::from_config(SearchConfig::default()).await
355    }
356
357    /// Index documents into the pipeline
358    ///
359    /// This embeds the documents and stores them in both the vector store
360    /// and BM25 index (if hybrid search is enabled).
361    pub async fn index_documents(&self, documents: Vec<IndexDocument>) -> Result<PipelineIndexStats> {
362        if documents.is_empty() {
363            return Ok(PipelineIndexStats::default());
364        }
365
366        info!("Indexing {} documents", documents.len());
367
368        // Extract texts for embedding
369        let texts: Vec<String> = documents.iter().map(|d| d.content.clone()).collect();
370
371        // Generate embeddings
372        let embeddings = self.embedding_provider
373            .embed_documents_batched(texts)
374            .await
375            .context("Failed to generate embeddings")?;
376
377        // Create embedded documents
378        let embedded_docs: Vec<EmbeddedDocument> = documents
379            .into_iter()
380            .zip(embeddings)
381            .map(|(doc, embedding)| EmbeddedDocument {
382                id: doc.id,
383                content: Some(doc.content),
384                embedding,
385                metadata: doc.metadata,
386            })
387            .collect();
388
389        let _doc_count = embedded_docs.len();
390
391        // Index in BM25 if enabled
392        #[cfg(feature = "hybrid-search")]
393        if let Some(ref bm25) = self.bm25_index {
394            let mut bm25_guard = bm25.write().await;
395            for doc in &embedded_docs {
396                if let Some(ref content) = doc.content {
397                    bm25_guard.add_document(&doc.id, content)?;
398                }
399            }
400            debug!("Added {} documents to BM25 index", doc_count);
401        }
402
403        // Upsert to vector store
404        let stats = self.vector_store.upsert(embedded_docs).await
405            .context("Failed to upsert to vector store")?;
406
407        let total = self.vector_store.count(None).await.unwrap_or(0);
408
409        Ok(PipelineIndexStats {
410            documents_added: stats.inserted,
411            documents_updated: stats.updated,
412            total_documents: total,
413            index_size_bytes: None,
414        })
415    }
416
417    /// Index documents with AI-generated examples
418    ///
419    /// When AI ingestion is enabled, this method generates synthetic examples
420    /// for each tool and appends them to the document content before indexing.
421    #[cfg(feature = "ai-ingestion")]
422    pub async fn index_documents_with_generation(
423        &self,
424        documents: Vec<IndexDocument>,
425        tools: Vec<ToolDocumentation>,
426    ) -> Result<(PipelineIndexStats, Vec<GeneratedExample>)> {
427        if let Some(ref generator) = self.example_generator {
428            let enhanced = self.enhance_documents_with_examples(documents, &tools, generator).await?;
429            let all_examples = enhanced.1;
430            let stats = self.index_documents(enhanced.0).await?;
431            Ok((stats, all_examples))
432        } else {
433            let stats = self.index_documents(documents).await?;
434            Ok((stats, Vec::new()))
435        }
436    }
437
438    /// Index documents with streaming generation events
439    ///
440    /// Returns a stream of generation events while indexing documents.
441    /// Useful for progress feedback in CLI/MCP contexts.
442    #[cfg(feature = "ai-ingestion")]
443    pub fn index_documents_stream<'a>(
444        &'a self,
445        documents: Vec<IndexDocument>,
446        tools: Vec<ToolDocumentation>,
447    ) -> impl Stream<Item = GenerationEvent> + 'a {
448        async_stream::stream! {
449            if let Some(ref generator) = self.example_generator {
450                let total_tools = tools.len();
451                let mut all_examples = Vec::new();
452
453                // Generate examples for each tool
454                for (idx, tool) in tools.iter().enumerate() {
455                    let mut stream = Box::pin(generator.generate_stream(tool, idx + 1, total_tools));
456
457                    while let Some(event) = stream.next().await {
458                        // Collect examples from events
459                        if let GenerationEvent::Example { ref example } = event {
460                            all_examples.push(example.clone());
461                        }
462                        yield event;
463                    }
464                }
465
466                // Enhance documents with generated examples
467                let enhanced_docs = self.enhance_documents_inline(&documents, &all_examples);
468
469                // Index the enhanced documents
470                match self.index_documents(enhanced_docs).await {
471                    Ok(stats) => {
472                        yield GenerationEvent::Completed {
473                            total_examples: all_examples.len(),
474                            total_valid: all_examples.iter().filter(|e| e.validated).count(),
475                            total_tools,
476                            duration_ms: 0, // Would need to track actual duration
477                        };
478                        info!(
479                            "Indexed {} documents with {} generated examples",
480                            stats.total_documents, all_examples.len()
481                        );
482                    }
483                    Err(e) => {
484                        yield GenerationEvent::Error {
485                            message: format!("Failed to index documents: {}", e),
486                            recoverable: false,
487                            tool_name: None,
488                        };
489                    }
490                }
491            } else {
492                // No generator, just index directly
493                match self.index_documents(documents).await {
494                    Ok(stats) => {
495                        yield GenerationEvent::Completed {
496                            total_examples: 0,
497                            total_valid: 0,
498                            total_tools: tools.len(),
499                            duration_ms: 0,
500                        };
501                        info!("Indexed {} documents (no AI generation)", stats.total_documents);
502                    }
503                    Err(e) => {
504                        yield GenerationEvent::Error {
505                            message: format!("Failed to index documents: {}", e),
506                            recoverable: false,
507                            tool_name: None,
508                        };
509                    }
510                }
511            }
512        }
513    }
514
515    /// Enhance documents with AI-generated examples
516    #[cfg(feature = "ai-ingestion")]
517    async fn enhance_documents_with_examples(
518        &self,
519        documents: Vec<IndexDocument>,
520        tools: &[ToolDocumentation],
521        generator: &ExampleGenerator,
522    ) -> Result<(Vec<IndexDocument>, Vec<GeneratedExample>)> {
523        let mut all_examples = Vec::new();
524
525        // Generate examples for each tool
526        for tool in tools {
527            match generator.generate(tool).await {
528                Ok(examples) => {
529                    info!(
530                        "Generated {} examples for tool '{}'",
531                        examples.len(), tool.name
532                    );
533                    all_examples.extend(examples);
534                }
535                Err(e) => {
536                    warn!("Failed to generate examples for '{}': {}", tool.name, e);
537                }
538            }
539        }
540
541        // Enhance document content with examples
542        let enhanced = self.enhance_documents_inline(&documents, &all_examples);
543
544        Ok((enhanced, all_examples))
545    }
546
547    /// Enhance document content with generated examples (inline)
548    #[cfg(feature = "ai-ingestion")]
549    fn enhance_documents_inline(
550        &self,
551        documents: &[IndexDocument],
552        examples: &[GeneratedExample],
553    ) -> Vec<IndexDocument> {
554        if examples.is_empty() {
555            return documents.to_vec();
556        }
557
558        // Build example text to append
559        let example_text = Self::format_examples_for_embedding(examples);
560
561        // Enhance each document by appending examples
562        documents
563            .iter()
564            .map(|doc| {
565                // Append examples that might be relevant to this document
566                // For now, append all examples - could be more selective based on tool_name
567                let enhanced_content = format!(
568                    "{}\n\n## Generated Examples\n\n{}",
569                    doc.content, example_text
570                );
571
572                IndexDocument {
573                    id: doc.id.clone(),
574                    content: enhanced_content,
575                    metadata: doc.metadata.clone(),
576                }
577            })
578            .collect()
579    }
580
581    /// Format examples for embedding text
582    #[cfg(feature = "ai-ingestion")]
583    fn format_examples_for_embedding(examples: &[GeneratedExample]) -> String {
584        examples
585            .iter()
586            .map(|e| {
587                format!(
588                    "Example: {}\n{}",
589                    e.command,
590                    e.explanation
591                )
592            })
593            .collect::<Vec<_>>()
594            .join("\n\n")
595    }
596
597    /// Check if AI example generation is enabled
598    #[cfg(feature = "ai-ingestion")]
599    pub fn has_example_generator(&self) -> bool {
600        self.example_generator.is_some()
601    }
602
603    /// Get the example generator info (provider name, model)
604    #[cfg(feature = "ai-ingestion")]
605    pub fn example_generator_info(&self) -> Option<(&str, &str)> {
606        self.example_generator.as_ref().map(|g| {
607            (g.provider_name(), g.model_name())
608        })
609    }
610
611    /// Search for documents matching the query
612    ///
613    /// # Arguments
614    /// * `query` - Natural language search query
615    /// * `top_k` - Maximum number of results to return
616    ///
617    /// # Returns
618    /// Ranked list of search results
619    pub async fn search(&self, query: &str, top_k: usize) -> Result<Vec<PipelineSearchResult>> {
620        debug!("Searching for: {} (top_k={})", query, top_k);
621
622        // Process query for understanding
623        let processed = self.query_processor.process(query);
624        let search_query = if !processed.normalized.is_empty() {
625            &processed.normalized
626        } else {
627            query
628        };
629
630        debug!(
631            "Query processed: intent={:?}, confidence={:.2}",
632            processed.intent, processed.intent_confidence
633        );
634
635        // Generate query embedding
636        let query_embedding = self.embedding_provider
637            .embed_query(search_query)
638            .await
639            .context("Failed to embed query")?;
640
641        // Determine how many candidates to fetch
642        let first_stage_k = self.config.retrieval.first_stage_k.max(top_k * 2);
643
644        // Perform search (hybrid or dense-only)
645        let candidates = self.retrieve_candidates(&query_embedding, search_query, first_stage_k).await?;
646
647        if candidates.is_empty() {
648            return Ok(Vec::new());
649        }
650
651        // Rerank if enabled
652        #[cfg(feature = "reranker")]
653        let reranked = if let Some(ref reranker) = self.reranker {
654            self.rerank_results(reranker.as_ref(), query, candidates, top_k).await?
655        } else {
656            candidates.into_iter().take(top_k).collect()
657        };
658
659        #[cfg(not(feature = "reranker"))]
660        let reranked: Vec<PipelineSearchResult> = candidates.into_iter().take(top_k).collect();
661
662        Ok(reranked)
663    }
664
665    /// Search with metadata filtering
666    pub async fn search_with_filter(
667        &self,
668        query: &str,
669        filter: Filter,
670        top_k: usize,
671    ) -> Result<Vec<PipelineSearchResult>> {
672        let query_embedding = self.embedding_provider
673            .embed_query(query)
674            .await
675            .context("Failed to embed query")?;
676
677        let results = self.vector_store
678            .search(query_embedding, Some(filter), top_k)
679            .await
680            .context("Vector search failed")?;
681
682        Ok(results
683            .into_iter()
684            .map(|r| PipelineSearchResult {
685                id: r.id,
686                content: r.content.unwrap_or_default(),
687                score: r.score,
688                metadata: r.metadata,
689                rerank_score: None,
690            })
691            .collect())
692    }
693
694    /// Retrieve candidates using hybrid or dense search
695    async fn retrieve_candidates(
696        &self,
697        query_embedding: &[f32],
698        _query_text: &str,
699        k: usize,
700    ) -> Result<Vec<PipelineSearchResult>> {
701        #[cfg(feature = "hybrid-search")]
702        if self.config.retrieval.enable_hybrid {
703            if let Some(ref bm25) = self.bm25_index {
704                return self.hybrid_retrieve(query_embedding, query_text, bm25, k).await;
705            }
706        }
707
708        // Dense-only search
709        let results = self.vector_store
710            .search(query_embedding.to_vec(), None, k)
711            .await
712            .context("Vector search failed")?;
713
714        Ok(results
715            .into_iter()
716            .map(|r| PipelineSearchResult {
717                id: r.id,
718                content: r.content.unwrap_or_default(),
719                score: r.score,
720                metadata: r.metadata,
721                rerank_score: None,
722            })
723            .collect())
724    }
725
726    /// Perform hybrid retrieval (dense + BM25)
727    #[cfg(feature = "hybrid-search")]
728    async fn hybrid_retrieve(
729        &self,
730        query_embedding: &[f32],
731        query_text: &str,
732        bm25: &Arc<RwLock<BM25Index>>,
733        k: usize,
734    ) -> Result<Vec<PipelineSearchResult>> {
735        use super::reciprocal_rank_fusion;
736
737        // Dense search
738        let dense_results = self.vector_store
739            .search(query_embedding.to_vec(), None, k)
740            .await
741            .context("Dense search failed")?;
742
743        // BM25 search
744        let bm25_guard = bm25.read().await;
745        let sparse_results = bm25_guard.search(query_text, k)?;
746
747        // Convert to common format for fusion
748        let dense_scores: Vec<(String, f32)> = dense_results
749            .iter()
750            .map(|r| (r.id.clone(), r.score))
751            .collect();
752
753        let sparse_scores: Vec<(String, f32)> = sparse_results
754            .iter()
755            .map(|r| (r.doc_id.clone(), r.score))
756            .collect();
757
758        // Reciprocal Rank Fusion
759        let rrf_k = self.config.retrieval.rrf_k;
760        let fused = reciprocal_rank_fusion(
761            vec![dense_scores, sparse_scores],
762            rrf_k,
763        );
764
765        // Rebuild results with fused scores
766        let mut results: Vec<PipelineSearchResult> = Vec::with_capacity(k);
767
768        for (id, score) in fused.into_iter().take(k) {
769            // Find the document content from dense results or BM25
770            if let Some(dense_match) = dense_results.iter().find(|r| r.id == id) {
771                results.push(PipelineSearchResult {
772                    id: dense_match.id.clone(),
773                    content: dense_match.content.clone().unwrap_or_default(),
774                    score,
775                    metadata: dense_match.metadata.clone(),
776                    rerank_score: None,
777                });
778            } else if let Some(_sparse_match) = sparse_results.iter().find(|r| r.doc_id == id) {
779                // Get full document from vector store
780                if let Ok(docs) = self.vector_store.get(vec![id.clone()]).await {
781                    if let Some(doc) = docs.into_iter().next() {
782                        results.push(PipelineSearchResult {
783                            id: doc.id,
784                            content: doc.content.unwrap_or_default(),
785                            score,
786                            metadata: doc.metadata,
787                            rerank_score: None,
788                        });
789                    }
790                }
791            }
792        }
793
794        Ok(results)
795    }
796
797    /// Rerank results using cross-encoder
798    #[cfg(feature = "reranker")]
799    async fn rerank_results(
800        &self,
801        reranker: &dyn Reranker,
802        query: &str,
803        candidates: Vec<PipelineSearchResult>,
804        top_k: usize,
805    ) -> Result<Vec<PipelineSearchResult>> {
806        if candidates.is_empty() {
807            return Ok(candidates);
808        }
809
810        let rerank_docs: Vec<RerankDocument> = candidates
811            .iter()
812            .map(|r| RerankDocument {
813                id: r.id.clone(),
814                text: r.content.clone(),
815            })
816            .collect();
817
818        let reranked = reranker.rerank(query, rerank_docs, top_k)?;
819
820        // Rebuild results with rerank scores
821        let results: Vec<PipelineSearchResult> = reranked
822            .into_iter()
823            .filter_map(|rr| {
824                candidates.iter().find(|c| c.id == rr.document.id).map(|c| {
825                    PipelineSearchResult {
826                        id: c.id.clone(),
827                        content: c.content.clone(),
828                        score: c.score,
829                        metadata: c.metadata.clone(),
830                        rerank_score: Some(rr.score),
831                    }
832                })
833            })
834            .collect();
835
836        Ok(results)
837    }
838
839    /// Get compressed context for LLM consumption
840    #[cfg(feature = "context-compression")]
841    pub fn compress_results(
842        &self,
843        results: &[PipelineSearchResult],
844    ) -> Result<Vec<CompressedToolContext>> {
845        let compressor = self.compressor.as_ref()
846            .context("Context compression not enabled")?;
847
848        let tools: Vec<_> = results
849            .iter()
850            .map(|r| {
851                // Parse as tool if possible
852                super::CompressedToolContext {
853                    name: r.metadata.tool_name.clone().unwrap_or_else(|| r.id.clone()),
854                    description: r.content.clone(),
855                    parameters: Vec::new(),
856                    example: None,
857                    score: r.rerank_score.unwrap_or(r.score),
858                }
859            })
860            .collect();
861
862        Ok(tools)
863    }
864
865    /// Check health of all pipeline components
866    pub async fn health_check(&self) -> PipelineHealth {
867        let mut healthy = true;
868
869        // Check embedding provider
870        let embedding_status = match self.embedding_provider.health_check().await {
871            Ok(true) => ProviderStatus {
872                name: self.embedding_provider.provider_name().to_string(),
873                healthy: true,
874                error: None,
875            },
876            Ok(false) => {
877                healthy = false;
878                ProviderStatus {
879                    name: self.embedding_provider.provider_name().to_string(),
880                    healthy: false,
881                    error: Some("Provider reported unhealthy".to_string()),
882                }
883            }
884            Err(e) => {
885                healthy = false;
886                ProviderStatus {
887                    name: self.embedding_provider.provider_name().to_string(),
888                    healthy: false,
889                    error: Some(e.to_string()),
890                }
891            }
892        };
893
894        // Check vector store
895        let vector_status = match self.vector_store.health_check().await {
896            Ok(status) => ProviderStatus {
897                name: self.vector_store.backend_name().to_string(),
898                healthy: status.healthy,
899                error: if status.healthy { None } else { Some("Unhealthy".to_string()) },
900            },
901            Err(e) => {
902                healthy = false;
903                ProviderStatus {
904                    name: self.vector_store.backend_name().to_string(),
905                    healthy: false,
906                    error: Some(e.to_string()),
907                }
908            }
909        };
910
911        // Check BM25 if enabled
912        #[cfg(feature = "hybrid-search")]
913        let bm25_status = if self.bm25_index.is_some() {
914            Some(ProviderStatus {
915                name: "BM25 (Tantivy)".to_string(),
916                healthy: true,
917                error: None,
918            })
919        } else {
920            None
921        };
922        #[cfg(not(feature = "hybrid-search"))]
923        let bm25_status: Option<ProviderStatus> = None;
924
925        // Check reranker if enabled
926        #[cfg(feature = "reranker")]
927        let reranker_status = if let Some(ref reranker) = self.reranker {
928            Some(ProviderStatus {
929                name: reranker.model_name().to_string(),
930                healthy: true,
931                error: None,
932            })
933        } else {
934            None
935        };
936        #[cfg(not(feature = "reranker"))]
937        let reranker_status: Option<ProviderStatus> = None;
938
939        // Check example generator if enabled
940        #[cfg(feature = "ai-ingestion")]
941        let generator_status = if let Some(ref generator) = self.example_generator {
942            Some(ProviderStatus {
943                name: format!("{}/{}", generator.provider_name(), generator.model_name()),
944                healthy: true,
945                error: None,
946            })
947        } else {
948            None
949        };
950        #[cfg(not(feature = "ai-ingestion"))]
951        let generator_status: Option<ProviderStatus> = None;
952
953        let indexed = self.vector_store.count(None).await.unwrap_or(0);
954
955        PipelineHealth {
956            healthy,
957            embedding_provider: embedding_status,
958            vector_store: vector_status,
959            bm25_index: bm25_status,
960            reranker: reranker_status,
961            example_generator: generator_status,
962            indexed_documents: indexed,
963        }
964    }
965
966    /// Get the number of indexed documents
967    pub async fn document_count(&self) -> Result<usize> {
968        self.vector_store.count(None).await
969    }
970
971    /// Clear all indexed documents
972    pub async fn clear(&self) -> Result<()> {
973        // For InMemory, we'd need to recreate it
974        // For now, just warn
975        warn!("Clear not fully implemented - documents may persist");
976        Ok(())
977    }
978
979    /// Get the configuration
980    pub fn config(&self) -> &SearchConfig {
981        &self.config
982    }
983
984    /// Get the embedding provider info
985    pub fn embedding_info(&self) -> (&str, &str, usize) {
986        (
987            self.embedding_provider.provider_name(),
988            self.embedding_provider.model_name(),
989            self.embedding_provider.dimensions(),
990        )
991    }
992
993    /// Add known skills to the query processor for better understanding
994    pub fn add_known_skill(&mut self, skill_name: &str) {
995        self.known_skills.push(skill_name.to_string());
996        self.rebuild_query_processor();
997    }
998
999    /// Add known tools to the query processor
1000    pub fn add_known_tool(&mut self, tool_name: &str) {
1001        self.known_tools.push(tool_name.to_string());
1002        self.rebuild_query_processor();
1003    }
1004
1005    /// Add multiple known skills at once
1006    pub fn add_known_skills(&mut self, skills: impl IntoIterator<Item = impl Into<String>>) {
1007        for skill in skills {
1008            self.known_skills.push(skill.into());
1009        }
1010        self.rebuild_query_processor();
1011    }
1012
1013    /// Add multiple known tools at once
1014    pub fn add_known_tools(&mut self, tools: impl IntoIterator<Item = impl Into<String>>) {
1015        for tool in tools {
1016            self.known_tools.push(tool.into());
1017        }
1018        self.rebuild_query_processor();
1019    }
1020
1021    /// Rebuild the query processor with current known skills and tools
1022    fn rebuild_query_processor(&mut self) {
1023        self.query_processor = QueryProcessor::new()
1024            .with_skills(self.known_skills.iter().cloned())
1025            .with_tools(self.known_tools.iter().cloned());
1026    }
1027
1028    /// Process a query without searching (for debugging)
1029    pub fn process_query(&self, query: &str) -> ProcessedQuery {
1030        self.query_processor.process(query)
1031    }
1032}
1033
1034#[cfg(test)]
1035mod tests {
1036    use super::*;
1037
1038    #[tokio::test]
1039    async fn test_pipeline_creation() {
1040        let config = SearchConfig::default();
1041        let pipeline = SearchPipeline::from_config(config).await;
1042        assert!(pipeline.is_ok());
1043    }
1044
1045    #[tokio::test]
1046    async fn test_pipeline_index_and_search() {
1047        let config = SearchConfig::default();
1048        let pipeline = SearchPipeline::from_config(config).await.unwrap();
1049
1050        // Index some documents
1051        let docs = vec![
1052            IndexDocument {
1053                id: "1".to_string(),
1054                content: "List all Kubernetes pods in the cluster".to_string(),
1055                metadata: DocumentMetadata {
1056                    skill_name: Some("kubernetes".to_string()),
1057                    tool_name: Some("list-pods".to_string()),
1058                    ..Default::default()
1059                },
1060            },
1061            IndexDocument {
1062                id: "2".to_string(),
1063                content: "Deploy a new application to Kubernetes".to_string(),
1064                metadata: DocumentMetadata {
1065                    skill_name: Some("kubernetes".to_string()),
1066                    tool_name: Some("deploy".to_string()),
1067                    ..Default::default()
1068                },
1069            },
1070            IndexDocument {
1071                id: "3".to_string(),
1072                content: "Create an S3 bucket in AWS".to_string(),
1073                metadata: DocumentMetadata {
1074                    skill_name: Some("aws".to_string()),
1075                    tool_name: Some("create-bucket".to_string()),
1076                    ..Default::default()
1077                },
1078            },
1079        ];
1080
1081        let stats = pipeline.index_documents(docs).await.unwrap();
1082        assert_eq!(stats.documents_added, 3);
1083        assert_eq!(stats.total_documents, 3);
1084
1085        // Search
1086        let results = pipeline.search("kubernetes pods", 2).await.unwrap();
1087        assert!(!results.is_empty());
1088        assert!(results.len() <= 2);
1089
1090        // First result should be related to kubernetes
1091        assert!(results[0].content.to_lowercase().contains("kubernetes"));
1092    }
1093
1094    #[tokio::test]
1095    async fn test_pipeline_health_check() {
1096        let config = SearchConfig::default();
1097        let pipeline = SearchPipeline::from_config(config).await.unwrap();
1098
1099        let health = pipeline.health_check().await;
1100        assert!(health.healthy);
1101        assert!(health.embedding_provider.healthy);
1102        assert!(health.vector_store.healthy);
1103    }
1104
1105    #[tokio::test]
1106    async fn test_query_processing() {
1107        let config = SearchConfig::default();
1108        let mut pipeline = SearchPipeline::from_config(config).await.unwrap();
1109
1110        pipeline.add_known_skill("kubernetes");
1111        pipeline.add_known_tool("list-pods");
1112
1113        let processed = pipeline.process_query("how do I list k8s pods?");
1114        assert!(!processed.normalized.is_empty());
1115    }
1116
1117    #[tokio::test]
1118    async fn test_empty_search() {
1119        let config = SearchConfig::default();
1120        let pipeline = SearchPipeline::from_config(config).await.unwrap();
1121
1122        // Search without indexing
1123        let results = pipeline.search("kubernetes", 5).await.unwrap();
1124        assert!(results.is_empty());
1125    }
1126
1127    #[tokio::test]
1128    async fn test_search_with_filter() {
1129        let config = SearchConfig::default();
1130        let pipeline = SearchPipeline::from_config(config).await.unwrap();
1131
1132        // Index documents
1133        let docs = vec![
1134            IndexDocument {
1135                id: "1".to_string(),
1136                content: "Kubernetes pods".to_string(),
1137                metadata: DocumentMetadata {
1138                    skill_name: Some("kubernetes".to_string()),
1139                    ..Default::default()
1140                },
1141            },
1142            IndexDocument {
1143                id: "2".to_string(),
1144                content: "AWS S3 bucket".to_string(),
1145                metadata: DocumentMetadata {
1146                    skill_name: Some("aws".to_string()),
1147                    ..Default::default()
1148                },
1149            },
1150        ];
1151        pipeline.index_documents(docs).await.unwrap();
1152
1153        // Search with filter
1154        let filter = Filter::new().skill("kubernetes");
1155        let results = pipeline.search_with_filter("bucket", filter, 5).await.unwrap();
1156
1157        // Should only return kubernetes results even though we searched for "bucket"
1158        for result in &results {
1159            if let Some(ref skill) = result.metadata.skill_name {
1160                assert_eq!(skill, "kubernetes");
1161            }
1162        }
1163    }
1164}