use anyhow::{Context, Result};
use std::sync::Arc;
use tracing::{debug, info, warn};
use crate::embeddings::{EmbeddingProvider, EmbeddingProviderFactory};
use crate::search_config::{BackendType, SearchConfig};
use crate::vector_store::{
EmbeddedDocument, DocumentMetadata, FileVectorStore, Filter, InMemoryVectorStore, VectorStore,
};
#[cfg(feature = "ai-ingestion")]
use crate::generation::{ExampleGenerator, GeneratorConfig, GenerationEvent, GeneratedExample, create_llm_provider};
#[cfg(feature = "ai-ingestion")]
use crate::skill_md::ToolDocumentation;
#[cfg(feature = "ai-ingestion")]
use futures_util::Stream;
#[cfg(feature = "ai-ingestion")]
use tokio_stream::StreamExt;
#[cfg(feature = "qdrant")]
use crate::vector_store::QdrantVectorStore;
#[cfg(feature = "hybrid-search")]
use super::{BM25Index, BM25Config};
#[cfg(feature = "hybrid-search")]
use tokio::sync::RwLock;
#[cfg(feature = "reranker")]
use super::{FastEmbedReranker, RerankerConfig as SearchRerankerConfig, Reranker, RerankDocument};
#[cfg(feature = "context-compression")]
use super::{ContextCompressor, CompressionConfig, CompressedToolContext};
use super::{QueryProcessor, ProcessedQuery};
#[derive(Debug, Clone)]
pub struct PipelineSearchResult {
pub id: String,
pub content: String,
pub score: f32,
pub metadata: DocumentMetadata,
pub rerank_score: Option<f32>,
}
#[derive(Debug, Clone, Default)]
pub struct PipelineIndexStats {
pub documents_added: usize,
pub documents_updated: usize,
pub total_documents: usize,
pub index_size_bytes: Option<usize>,
}
#[derive(Debug, Clone)]
pub struct PipelineHealth {
pub healthy: bool,
pub embedding_provider: ProviderStatus,
pub vector_store: ProviderStatus,
pub bm25_index: Option<ProviderStatus>,
pub reranker: Option<ProviderStatus>,
pub example_generator: Option<ProviderStatus>,
pub indexed_documents: usize,
}
#[derive(Debug, Clone)]
pub struct ProviderStatus {
pub name: String,
pub healthy: bool,
pub error: Option<String>,
}
#[derive(Debug, Clone)]
pub struct IndexDocument {
pub id: String,
pub content: String,
pub metadata: DocumentMetadata,
}
pub struct SearchPipeline {
config: SearchConfig,
embedding_provider: Arc<dyn EmbeddingProvider>,
vector_store: Arc<dyn VectorStore>,
#[cfg(feature = "hybrid-search")]
bm25_index: Option<Arc<RwLock<BM25Index>>>,
#[cfg(feature = "reranker")]
reranker: Option<Arc<dyn Reranker>>,
#[cfg(feature = "context-compression")]
compressor: Option<ContextCompressor>,
#[cfg(feature = "ai-ingestion")]
example_generator: Option<Arc<ExampleGenerator>>,
query_processor: QueryProcessor,
known_skills: Vec<String>,
known_tools: Vec<String>,
}
impl SearchPipeline {
pub async fn from_config(config: SearchConfig) -> Result<Self> {
info!("Initializing SearchPipeline with config");
config.validate().context("Invalid search configuration")?;
let embedding_config = crate::embeddings::EmbeddingConfig {
provider: config.embedding.provider.parse()
.unwrap_or(crate::embeddings::EmbeddingProviderType::FastEmbed),
model: Some(config.embedding.model.clone()),
api_key: config.embedding.openai_api_key.clone(),
base_url: config.embedding.ollama_host.clone(),
batch_size: 100,
};
let embedding_provider = EmbeddingProviderFactory::create(&embedding_config)
.context("Failed to create embedding provider")?;
debug!(
"Created embedding provider: {} ({})",
embedding_provider.provider_name(),
embedding_provider.model_name()
);
let vector_store: Arc<dyn VectorStore> = match config.backend.backend_type {
BackendType::File => {
let file_config_from_search = config.file.as_ref();
let file_config = crate::vector_store::FileConfig {
storage_dir: file_config_from_search.and_then(|c| c.storage_path.clone()),
distance_metric: file_config_from_search
.map(|c| c.distance_metric)
.unwrap_or(crate::vector_store::DistanceMetric::Cosine),
};
Arc::new(
FileVectorStore::new(file_config)
.context("Failed to create File vector store")?
)
}
BackendType::InMemory => {
Arc::new(InMemoryVectorStore::with_dimensions(config.embedding.dimensions))
}
#[cfg(feature = "qdrant")]
BackendType::Qdrant => {
let qdrant_config = config.qdrant.as_ref()
.context("Qdrant config required for qdrant backend")?;
let qdrant_store = QdrantVectorStore::new(crate::vector_store::QdrantConfig {
url: qdrant_config.url.clone(),
api_key: qdrant_config.api_key.clone(),
collection_name: qdrant_config.collection.clone(),
vector_size: config.embedding.dimensions,
..Default::default()
}).await.context("Failed to create Qdrant store")?;
Arc::new(qdrant_store)
}
#[cfg(not(feature = "qdrant"))]
BackendType::Qdrant => {
anyhow::bail!("Qdrant backend requires 'qdrant' feature to be enabled");
}
};
debug!("Created vector store: {}", vector_store.backend_name());
#[cfg(feature = "hybrid-search")]
let bm25_index = if config.retrieval.enable_hybrid {
let bm25_config = BM25Config::default();
let index = BM25Index::new(bm25_config)?;
Some(Arc::new(RwLock::new(index)))
} else {
None
};
#[cfg(feature = "reranker")]
let reranker: Option<Arc<dyn Reranker>> = if config.reranker.enabled {
let reranker_config = SearchRerankerConfig {
model: config.reranker.model.parse().unwrap_or_default(),
top_k: config.retrieval.final_k,
..Default::default()
};
let fastembed_reranker = FastEmbedReranker::new(reranker_config)
.context("Failed to create reranker")?;
Some(Arc::new(fastembed_reranker))
} else {
None
};
#[cfg(feature = "context-compression")]
let compressor = {
let compression_config = CompressionConfig {
max_tokens_per_result: config.context.max_tokens_per_result,
max_total_tokens: config.context.max_total_tokens,
strategy: match config.context.compression {
crate::search_config::CompressionStrategy::Extractive => {
super::CompressionStrategy::Extractive
}
crate::search_config::CompressionStrategy::Template => {
super::CompressionStrategy::TemplateBased
}
crate::search_config::CompressionStrategy::Progressive => {
super::CompressionStrategy::Progressive
}
crate::search_config::CompressionStrategy::None => {
super::CompressionStrategy::None
}
},
..Default::default()
};
Some(ContextCompressor::new(compression_config)?)
};
let query_processor = QueryProcessor::new();
#[cfg(feature = "ai-ingestion")]
let example_generator = if config.ai_ingestion.enabled {
match create_llm_provider(&config.ai_ingestion) {
Ok(llm) => {
let gen_config = GeneratorConfig::from(&config.ai_ingestion);
info!(
"AI example generation enabled: {} / {}",
llm.name(),
llm.model()
);
Some(Arc::new(ExampleGenerator::new(llm, gen_config)))
}
Err(e) => {
warn!("Failed to create LLM provider for AI ingestion: {}", e);
None
}
}
} else {
None
};
Ok(Self {
config,
embedding_provider,
vector_store,
#[cfg(feature = "hybrid-search")]
bm25_index,
#[cfg(feature = "reranker")]
reranker,
#[cfg(feature = "context-compression")]
compressor,
#[cfg(feature = "ai-ingestion")]
example_generator,
query_processor,
known_skills: Vec::new(),
known_tools: Vec::new(),
})
}
pub async fn default_pipeline() -> Result<Self> {
Self::from_config(SearchConfig::default()).await
}
pub async fn index_documents(&self, documents: Vec<IndexDocument>) -> Result<PipelineIndexStats> {
if documents.is_empty() {
return Ok(PipelineIndexStats::default());
}
info!("Indexing {} documents", documents.len());
let texts: Vec<String> = documents.iter().map(|d| d.content.clone()).collect();
let embeddings = self.embedding_provider
.embed_documents_batched(texts)
.await
.context("Failed to generate embeddings")?;
let embedded_docs: Vec<EmbeddedDocument> = documents
.into_iter()
.zip(embeddings)
.map(|(doc, embedding)| EmbeddedDocument {
id: doc.id,
content: Some(doc.content),
embedding,
metadata: doc.metadata,
})
.collect();
let _doc_count = embedded_docs.len();
#[cfg(feature = "hybrid-search")]
if let Some(ref bm25) = self.bm25_index {
let mut bm25_guard = bm25.write().await;
for doc in &embedded_docs {
if let Some(ref content) = doc.content {
bm25_guard.add_document(&doc.id, content)?;
}
}
debug!("Added {} documents to BM25 index", doc_count);
}
let stats = self.vector_store.upsert(embedded_docs).await
.context("Failed to upsert to vector store")?;
let total = self.vector_store.count(None).await.unwrap_or(0);
Ok(PipelineIndexStats {
documents_added: stats.inserted,
documents_updated: stats.updated,
total_documents: total,
index_size_bytes: None,
})
}
#[cfg(feature = "ai-ingestion")]
pub async fn index_documents_with_generation(
&self,
documents: Vec<IndexDocument>,
tools: Vec<ToolDocumentation>,
) -> Result<(PipelineIndexStats, Vec<GeneratedExample>)> {
if let Some(ref generator) = self.example_generator {
let enhanced = self.enhance_documents_with_examples(documents, &tools, generator).await?;
let all_examples = enhanced.1;
let stats = self.index_documents(enhanced.0).await?;
Ok((stats, all_examples))
} else {
let stats = self.index_documents(documents).await?;
Ok((stats, Vec::new()))
}
}
#[cfg(feature = "ai-ingestion")]
pub fn index_documents_stream<'a>(
&'a self,
documents: Vec<IndexDocument>,
tools: Vec<ToolDocumentation>,
) -> impl Stream<Item = GenerationEvent> + 'a {
async_stream::stream! {
if let Some(ref generator) = self.example_generator {
let total_tools = tools.len();
let mut all_examples = Vec::new();
for (idx, tool) in tools.iter().enumerate() {
let mut stream = Box::pin(generator.generate_stream(tool, idx + 1, total_tools));
while let Some(event) = stream.next().await {
if let GenerationEvent::Example { ref example } = event {
all_examples.push(example.clone());
}
yield event;
}
}
let enhanced_docs = self.enhance_documents_inline(&documents, &all_examples);
match self.index_documents(enhanced_docs).await {
Ok(stats) => {
yield GenerationEvent::Completed {
total_examples: all_examples.len(),
total_valid: all_examples.iter().filter(|e| e.validated).count(),
total_tools,
duration_ms: 0, };
info!(
"Indexed {} documents with {} generated examples",
stats.total_documents, all_examples.len()
);
}
Err(e) => {
yield GenerationEvent::Error {
message: format!("Failed to index documents: {}", e),
recoverable: false,
tool_name: None,
};
}
}
} else {
match self.index_documents(documents).await {
Ok(stats) => {
yield GenerationEvent::Completed {
total_examples: 0,
total_valid: 0,
total_tools: tools.len(),
duration_ms: 0,
};
info!("Indexed {} documents (no AI generation)", stats.total_documents);
}
Err(e) => {
yield GenerationEvent::Error {
message: format!("Failed to index documents: {}", e),
recoverable: false,
tool_name: None,
};
}
}
}
}
}
#[cfg(feature = "ai-ingestion")]
async fn enhance_documents_with_examples(
&self,
documents: Vec<IndexDocument>,
tools: &[ToolDocumentation],
generator: &ExampleGenerator,
) -> Result<(Vec<IndexDocument>, Vec<GeneratedExample>)> {
let mut all_examples = Vec::new();
for tool in tools {
match generator.generate(tool).await {
Ok(examples) => {
info!(
"Generated {} examples for tool '{}'",
examples.len(), tool.name
);
all_examples.extend(examples);
}
Err(e) => {
warn!("Failed to generate examples for '{}': {}", tool.name, e);
}
}
}
let enhanced = self.enhance_documents_inline(&documents, &all_examples);
Ok((enhanced, all_examples))
}
#[cfg(feature = "ai-ingestion")]
fn enhance_documents_inline(
&self,
documents: &[IndexDocument],
examples: &[GeneratedExample],
) -> Vec<IndexDocument> {
if examples.is_empty() {
return documents.to_vec();
}
let example_text = Self::format_examples_for_embedding(examples);
documents
.iter()
.map(|doc| {
let enhanced_content = format!(
"{}\n\n## Generated Examples\n\n{}",
doc.content, example_text
);
IndexDocument {
id: doc.id.clone(),
content: enhanced_content,
metadata: doc.metadata.clone(),
}
})
.collect()
}
#[cfg(feature = "ai-ingestion")]
fn format_examples_for_embedding(examples: &[GeneratedExample]) -> String {
examples
.iter()
.map(|e| {
format!(
"Example: {}\n{}",
e.command,
e.explanation
)
})
.collect::<Vec<_>>()
.join("\n\n")
}
#[cfg(feature = "ai-ingestion")]
pub fn has_example_generator(&self) -> bool {
self.example_generator.is_some()
}
#[cfg(feature = "ai-ingestion")]
pub fn example_generator_info(&self) -> Option<(&str, &str)> {
self.example_generator.as_ref().map(|g| {
(g.provider_name(), g.model_name())
})
}
pub async fn search(&self, query: &str, top_k: usize) -> Result<Vec<PipelineSearchResult>> {
debug!("Searching for: {} (top_k={})", query, top_k);
let processed = self.query_processor.process(query);
let search_query = if !processed.normalized.is_empty() {
&processed.normalized
} else {
query
};
debug!(
"Query processed: intent={:?}, confidence={:.2}",
processed.intent, processed.intent_confidence
);
let query_embedding = self.embedding_provider
.embed_query(search_query)
.await
.context("Failed to embed query")?;
let first_stage_k = self.config.retrieval.first_stage_k.max(top_k * 2);
let candidates = self.retrieve_candidates(&query_embedding, search_query, first_stage_k).await?;
if candidates.is_empty() {
return Ok(Vec::new());
}
#[cfg(feature = "reranker")]
let reranked = if let Some(ref reranker) = self.reranker {
self.rerank_results(reranker.as_ref(), query, candidates, top_k).await?
} else {
candidates.into_iter().take(top_k).collect()
};
#[cfg(not(feature = "reranker"))]
let reranked: Vec<PipelineSearchResult> = candidates.into_iter().take(top_k).collect();
Ok(reranked)
}
pub async fn search_with_filter(
&self,
query: &str,
filter: Filter,
top_k: usize,
) -> Result<Vec<PipelineSearchResult>> {
let query_embedding = self.embedding_provider
.embed_query(query)
.await
.context("Failed to embed query")?;
let results = self.vector_store
.search(query_embedding, Some(filter), top_k)
.await
.context("Vector search failed")?;
Ok(results
.into_iter()
.map(|r| PipelineSearchResult {
id: r.id,
content: r.content.unwrap_or_default(),
score: r.score,
metadata: r.metadata,
rerank_score: None,
})
.collect())
}
async fn retrieve_candidates(
&self,
query_embedding: &[f32],
_query_text: &str,
k: usize,
) -> Result<Vec<PipelineSearchResult>> {
#[cfg(feature = "hybrid-search")]
if self.config.retrieval.enable_hybrid {
if let Some(ref bm25) = self.bm25_index {
return self.hybrid_retrieve(query_embedding, query_text, bm25, k).await;
}
}
let results = self.vector_store
.search(query_embedding.to_vec(), None, k)
.await
.context("Vector search failed")?;
Ok(results
.into_iter()
.map(|r| PipelineSearchResult {
id: r.id,
content: r.content.unwrap_or_default(),
score: r.score,
metadata: r.metadata,
rerank_score: None,
})
.collect())
}
#[cfg(feature = "hybrid-search")]
async fn hybrid_retrieve(
&self,
query_embedding: &[f32],
query_text: &str,
bm25: &Arc<RwLock<BM25Index>>,
k: usize,
) -> Result<Vec<PipelineSearchResult>> {
use super::reciprocal_rank_fusion;
let dense_results = self.vector_store
.search(query_embedding.to_vec(), None, k)
.await
.context("Dense search failed")?;
let bm25_guard = bm25.read().await;
let sparse_results = bm25_guard.search(query_text, k)?;
let dense_scores: Vec<(String, f32)> = dense_results
.iter()
.map(|r| (r.id.clone(), r.score))
.collect();
let sparse_scores: Vec<(String, f32)> = sparse_results
.iter()
.map(|r| (r.doc_id.clone(), r.score))
.collect();
let rrf_k = self.config.retrieval.rrf_k;
let fused = reciprocal_rank_fusion(
vec![dense_scores, sparse_scores],
rrf_k,
);
let mut results: Vec<PipelineSearchResult> = Vec::with_capacity(k);
for (id, score) in fused.into_iter().take(k) {
if let Some(dense_match) = dense_results.iter().find(|r| r.id == id) {
results.push(PipelineSearchResult {
id: dense_match.id.clone(),
content: dense_match.content.clone().unwrap_or_default(),
score,
metadata: dense_match.metadata.clone(),
rerank_score: None,
});
} else if let Some(_sparse_match) = sparse_results.iter().find(|r| r.doc_id == id) {
if let Ok(docs) = self.vector_store.get(vec![id.clone()]).await {
if let Some(doc) = docs.into_iter().next() {
results.push(PipelineSearchResult {
id: doc.id,
content: doc.content.unwrap_or_default(),
score,
metadata: doc.metadata,
rerank_score: None,
});
}
}
}
}
Ok(results)
}
#[cfg(feature = "reranker")]
async fn rerank_results(
&self,
reranker: &dyn Reranker,
query: &str,
candidates: Vec<PipelineSearchResult>,
top_k: usize,
) -> Result<Vec<PipelineSearchResult>> {
if candidates.is_empty() {
return Ok(candidates);
}
let rerank_docs: Vec<RerankDocument> = candidates
.iter()
.map(|r| RerankDocument {
id: r.id.clone(),
text: r.content.clone(),
})
.collect();
let reranked = reranker.rerank(query, rerank_docs, top_k)?;
let results: Vec<PipelineSearchResult> = reranked
.into_iter()
.filter_map(|rr| {
candidates.iter().find(|c| c.id == rr.document.id).map(|c| {
PipelineSearchResult {
id: c.id.clone(),
content: c.content.clone(),
score: c.score,
metadata: c.metadata.clone(),
rerank_score: Some(rr.score),
}
})
})
.collect();
Ok(results)
}
#[cfg(feature = "context-compression")]
pub fn compress_results(
&self,
results: &[PipelineSearchResult],
) -> Result<Vec<CompressedToolContext>> {
let compressor = self.compressor.as_ref()
.context("Context compression not enabled")?;
let tools: Vec<_> = results
.iter()
.map(|r| {
super::CompressedToolContext {
name: r.metadata.tool_name.clone().unwrap_or_else(|| r.id.clone()),
description: r.content.clone(),
parameters: Vec::new(),
example: None,
score: r.rerank_score.unwrap_or(r.score),
}
})
.collect();
Ok(tools)
}
pub async fn health_check(&self) -> PipelineHealth {
let mut healthy = true;
let embedding_status = match self.embedding_provider.health_check().await {
Ok(true) => ProviderStatus {
name: self.embedding_provider.provider_name().to_string(),
healthy: true,
error: None,
},
Ok(false) => {
healthy = false;
ProviderStatus {
name: self.embedding_provider.provider_name().to_string(),
healthy: false,
error: Some("Provider reported unhealthy".to_string()),
}
}
Err(e) => {
healthy = false;
ProviderStatus {
name: self.embedding_provider.provider_name().to_string(),
healthy: false,
error: Some(e.to_string()),
}
}
};
let vector_status = match self.vector_store.health_check().await {
Ok(status) => ProviderStatus {
name: self.vector_store.backend_name().to_string(),
healthy: status.healthy,
error: if status.healthy { None } else { Some("Unhealthy".to_string()) },
},
Err(e) => {
healthy = false;
ProviderStatus {
name: self.vector_store.backend_name().to_string(),
healthy: false,
error: Some(e.to_string()),
}
}
};
#[cfg(feature = "hybrid-search")]
let bm25_status = if self.bm25_index.is_some() {
Some(ProviderStatus {
name: "BM25 (Tantivy)".to_string(),
healthy: true,
error: None,
})
} else {
None
};
#[cfg(not(feature = "hybrid-search"))]
let bm25_status: Option<ProviderStatus> = None;
#[cfg(feature = "reranker")]
let reranker_status = if let Some(ref reranker) = self.reranker {
Some(ProviderStatus {
name: reranker.model_name().to_string(),
healthy: true,
error: None,
})
} else {
None
};
#[cfg(not(feature = "reranker"))]
let reranker_status: Option<ProviderStatus> = None;
#[cfg(feature = "ai-ingestion")]
let generator_status = if let Some(ref generator) = self.example_generator {
Some(ProviderStatus {
name: format!("{}/{}", generator.provider_name(), generator.model_name()),
healthy: true,
error: None,
})
} else {
None
};
#[cfg(not(feature = "ai-ingestion"))]
let generator_status: Option<ProviderStatus> = None;
let indexed = self.vector_store.count(None).await.unwrap_or(0);
PipelineHealth {
healthy,
embedding_provider: embedding_status,
vector_store: vector_status,
bm25_index: bm25_status,
reranker: reranker_status,
example_generator: generator_status,
indexed_documents: indexed,
}
}
pub async fn document_count(&self) -> Result<usize> {
self.vector_store.count(None).await
}
pub async fn clear(&self) -> Result<()> {
warn!("Clear not fully implemented - documents may persist");
Ok(())
}
pub fn config(&self) -> &SearchConfig {
&self.config
}
pub fn embedding_info(&self) -> (&str, &str, usize) {
(
self.embedding_provider.provider_name(),
self.embedding_provider.model_name(),
self.embedding_provider.dimensions(),
)
}
pub fn add_known_skill(&mut self, skill_name: &str) {
self.known_skills.push(skill_name.to_string());
self.rebuild_query_processor();
}
pub fn add_known_tool(&mut self, tool_name: &str) {
self.known_tools.push(tool_name.to_string());
self.rebuild_query_processor();
}
pub fn add_known_skills(&mut self, skills: impl IntoIterator<Item = impl Into<String>>) {
for skill in skills {
self.known_skills.push(skill.into());
}
self.rebuild_query_processor();
}
pub fn add_known_tools(&mut self, tools: impl IntoIterator<Item = impl Into<String>>) {
for tool in tools {
self.known_tools.push(tool.into());
}
self.rebuild_query_processor();
}
fn rebuild_query_processor(&mut self) {
self.query_processor = QueryProcessor::new()
.with_skills(self.known_skills.iter().cloned())
.with_tools(self.known_tools.iter().cloned());
}
pub fn process_query(&self, query: &str) -> ProcessedQuery {
self.query_processor.process(query)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_pipeline_creation() {
let config = SearchConfig::default();
let pipeline = SearchPipeline::from_config(config).await;
assert!(pipeline.is_ok());
}
#[tokio::test]
async fn test_pipeline_index_and_search() {
let config = SearchConfig::default();
let pipeline = SearchPipeline::from_config(config).await.unwrap();
let docs = vec![
IndexDocument {
id: "1".to_string(),
content: "List all Kubernetes pods in the cluster".to_string(),
metadata: DocumentMetadata {
skill_name: Some("kubernetes".to_string()),
tool_name: Some("list-pods".to_string()),
..Default::default()
},
},
IndexDocument {
id: "2".to_string(),
content: "Deploy a new application to Kubernetes".to_string(),
metadata: DocumentMetadata {
skill_name: Some("kubernetes".to_string()),
tool_name: Some("deploy".to_string()),
..Default::default()
},
},
IndexDocument {
id: "3".to_string(),
content: "Create an S3 bucket in AWS".to_string(),
metadata: DocumentMetadata {
skill_name: Some("aws".to_string()),
tool_name: Some("create-bucket".to_string()),
..Default::default()
},
},
];
let stats = pipeline.index_documents(docs).await.unwrap();
assert_eq!(stats.documents_added, 3);
assert_eq!(stats.total_documents, 3);
let results = pipeline.search("kubernetes pods", 2).await.unwrap();
assert!(!results.is_empty());
assert!(results.len() <= 2);
assert!(results[0].content.to_lowercase().contains("kubernetes"));
}
#[tokio::test]
async fn test_pipeline_health_check() {
let config = SearchConfig::default();
let pipeline = SearchPipeline::from_config(config).await.unwrap();
let health = pipeline.health_check().await;
assert!(health.healthy);
assert!(health.embedding_provider.healthy);
assert!(health.vector_store.healthy);
}
#[tokio::test]
async fn test_query_processing() {
let config = SearchConfig::default();
let mut pipeline = SearchPipeline::from_config(config).await.unwrap();
pipeline.add_known_skill("kubernetes");
pipeline.add_known_tool("list-pods");
let processed = pipeline.process_query("how do I list k8s pods?");
assert!(!processed.normalized.is_empty());
}
#[tokio::test]
async fn test_empty_search() {
let config = SearchConfig::default();
let pipeline = SearchPipeline::from_config(config).await.unwrap();
let results = pipeline.search("kubernetes", 5).await.unwrap();
assert!(results.is_empty());
}
#[tokio::test]
async fn test_search_with_filter() {
let config = SearchConfig::default();
let pipeline = SearchPipeline::from_config(config).await.unwrap();
let docs = vec![
IndexDocument {
id: "1".to_string(),
content: "Kubernetes pods".to_string(),
metadata: DocumentMetadata {
skill_name: Some("kubernetes".to_string()),
..Default::default()
},
},
IndexDocument {
id: "2".to_string(),
content: "AWS S3 bucket".to_string(),
metadata: DocumentMetadata {
skill_name: Some("aws".to_string()),
..Default::default()
},
},
];
pipeline.index_documents(docs).await.unwrap();
let filter = Filter::new().skill("kubernetes");
let results = pipeline.search_with_filter("bucket", filter, 5).await.unwrap();
for result in &results {
if let Some(ref skill) = result.metadata.skill_name {
assert_eq!(skill, "kubernetes");
}
}
}
}