use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use lancedb::{
connect,
index::scalar::FullTextSearchQuery,
query::{ExecutableQuery, QueryBase},
Connection, DistanceType, Table,
};
use tokio::sync::RwLock;
use self::{
batch_converter::BatchConverter, block_trait::BlockType, debug::DebugOperations,
graphrag::GraphRagOperations, metadata::MetadataOperations, table_ops::TableOperations,
vector_optimizer::VectorOptimizer,
};
pub mod batch_converter;
pub mod block_trait;
pub mod debug;
pub mod graphrag;
#[cfg(test)]
mod hybrid_tests;
pub mod metadata;
pub mod table_ops;
pub mod vector_optimizer;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct CodeBlock {
pub path: String,
pub language: String,
pub content: String,
pub symbols: Vec<String>,
pub start_line: usize,
pub end_line: usize,
pub hash: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub distance: Option<f32>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TextBlock {
pub path: String,
pub language: String,
pub content: String,
pub start_line: usize,
pub end_line: usize,
pub hash: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub distance: Option<f32>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DocumentBlock {
pub path: String,
pub title: String,
pub content: String, pub context: Vec<String>, pub level: usize,
pub start_line: usize,
pub end_line: usize,
pub hash: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub distance: Option<f32>,
}
#[derive(Debug, Clone)]
pub struct HybridSearchQuery {
pub vector_query: Option<Vec<f32>>,
pub keywords: Option<String>,
pub vector_weight: f32,
pub keyword_weight: f32,
pub limit: usize,
pub min_relevance: Option<f32>,
pub language_filter: Option<String>,
}
impl HybridSearchQuery {
pub fn validate(&self) -> Result<(), String> {
if self.vector_weight < 0.0 || self.vector_weight > 1.0 {
return Err(format!(
"vector_weight must be in [0.0, 1.0], got {}",
self.vector_weight
));
}
if self.keyword_weight < 0.0 || self.keyword_weight > 1.0 {
return Err(format!(
"keyword_weight must be in [0.0, 1.0], got {}",
self.keyword_weight
));
}
if self.vector_query.is_none() && self.keywords.is_none() {
return Err("At least one of vector_query or keywords must be provided".to_string());
}
Ok(())
}
}
pub struct Store {
db: Connection,
code_vector_dim: usize, text_vector_dim: usize, table_cache: Arc<RwLock<HashMap<String, Arc<Table>>>>,
}
impl Drop for Store {
fn drop(&mut self) {
if cfg!(debug_assertions) {
tracing::debug!("Store instance dropped, database connection released");
}
}
}
impl Store {
pub async fn new() -> Result<Self> {
let current_dir = std::env::current_dir()?;
let index_path = crate::storage::get_project_database_path(¤t_dir)?;
crate::storage::ensure_project_storage_exists(¤t_dir)?;
if !index_path.exists() {
std::fs::create_dir_all(&index_path)?;
}
let storage_path = index_path
.to_str()
.ok_or_else(|| anyhow::anyhow!("Invalid database path"))?;
let config = crate::config::Config::load()?;
let (code_provider, code_model) =
crate::embedding::parse_provider_model(&config.embedding.code_model)
.map_err(|e| anyhow::anyhow!("Failed to parse code model: {}", e))?;
let code_vector_dim = config
.embedding
.get_vector_dimension(&code_provider, &code_model)
.await?;
let (text_provider, text_model) =
crate::embedding::parse_provider_model(&config.embedding.text_model)
.map_err(|e| anyhow::anyhow!("Failed to parse text model: {}", e))?;
let text_vector_dim = config
.embedding
.get_vector_dimension(&text_provider, &text_model)
.await?;
let db = connect(storage_path).execute().await?;
let table_names = db.table_names().execute().await?;
for table_name in [
"code_blocks",
"text_blocks",
"document_blocks",
"graphrag_nodes",
] {
if table_names.contains(&table_name.to_string()) {
if let Ok(table) = db.open_table(table_name).execute().await {
if let Ok(schema) = table.schema().await {
if let Ok(field) = schema.field_with_name("embedding") {
if let DataType::FixedSizeList(_, size) = field.data_type() {
let expected_dim = match table_name {
"code_blocks" | "graphrag_nodes" => code_vector_dim as i32,
"text_blocks" | "document_blocks" => text_vector_dim as i32,
_ => continue,
};
if size != &expected_dim {
tracing::warn!("Schema mismatch detected for table '{}': expected dimension {}, found {}. Dropping table for recreation.",
table_name, expected_dim, size);
drop(table); if let Err(e) = db.drop_table(table_name, &[]).await {
tracing::warn!(
"Failed to drop table {}: {}",
table_name,
e
);
}
}
}
}
}
}
}
}
Ok(Self {
db,
code_vector_dim,
text_vector_dim,
table_cache: Arc::new(RwLock::new(HashMap::new())),
})
}
async fn get_table(&self, table_name: &str) -> Result<Arc<Table>> {
{
let cache = self.table_cache.read().await;
if let Some(table) = cache.get(table_name) {
return Ok(Arc::clone(table));
}
}
let mut cache = self.table_cache.write().await;
if let Some(table) = cache.get(table_name) {
return Ok(Arc::clone(table));
}
let table = self.db.open_table(table_name).execute().await?;
let table = Arc::new(table);
cache.insert(table_name.to_string(), Arc::clone(&table));
Ok(table)
}
pub async fn initialize_collections(&self) -> Result<()> {
let table_names = self.db.table_names().execute().await?;
if !table_names.contains(&"code_blocks".to_string()) {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("path", DataType::Utf8, false),
Field::new("language", DataType::Utf8, false),
Field::new("content", DataType::Utf8, false),
Field::new("symbols", DataType::Utf8, true),
Field::new("start_line", DataType::UInt32, false),
Field::new("end_line", DataType::UInt32, false),
Field::new("hash", DataType::Utf8, false),
Field::new(
"embedding",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
self.code_vector_dim as i32,
),
true,
),
]));
let _table = self
.db
.create_empty_table("code_blocks", schema)
.execute()
.await?;
}
if !table_names.contains(&"text_blocks".to_string()) {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("path", DataType::Utf8, false),
Field::new("language", DataType::Utf8, false),
Field::new("content", DataType::Utf8, false),
Field::new("start_line", DataType::UInt32, false),
Field::new("end_line", DataType::UInt32, false),
Field::new("hash", DataType::Utf8, false),
Field::new(
"embedding",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
self.text_vector_dim as i32,
),
true,
),
]));
let _table = self
.db
.create_empty_table("text_blocks", schema)
.execute()
.await?;
}
if !table_names.contains(&"document_blocks".to_string()) {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("path", DataType::Utf8, false),
Field::new("title", DataType::Utf8, false),
Field::new("content", DataType::Utf8, false),
Field::new(
"context",
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
true,
),
Field::new("level", DataType::UInt32, false),
Field::new("start_line", DataType::UInt32, false),
Field::new("end_line", DataType::UInt32, false),
Field::new("hash", DataType::Utf8, false),
Field::new(
"embedding",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
self.text_vector_dim as i32,
),
true,
),
]));
let _table = self
.db
.create_empty_table("document_blocks", schema)
.execute()
.await?;
}
Ok(())
}
pub async fn content_exists(&self, hash: &str, collection: &str) -> Result<bool> {
let table_ops = TableOperations::new(&self.db);
table_ops.content_exists(hash, collection).await
}
pub async fn store_code_blocks(
&self,
blocks: &[CodeBlock],
embeddings: &[Vec<f32>],
) -> Result<()> {
self.store_blocks(blocks, embeddings, self.code_vector_dim)
.await
}
pub async fn store_text_blocks(
&self,
blocks: &[TextBlock],
embeddings: &[Vec<f32>],
) -> Result<()> {
self.store_blocks(blocks, embeddings, self.text_vector_dim)
.await
}
pub async fn store_document_blocks(
&self,
blocks: &[DocumentBlock],
embeddings: &[Vec<f32>],
) -> Result<()> {
self.store_blocks(blocks, embeddings, self.text_vector_dim)
.await
}
pub async fn get_code_blocks(&self, embedding: Vec<f32>) -> Result<Vec<CodeBlock>> {
self.get_code_blocks_with_config(embedding, None, None)
.await
}
pub async fn get_code_blocks_with_config(
&self,
embedding: Vec<f32>,
limit: Option<usize>,
distance_threshold: Option<f32>,
) -> Result<Vec<CodeBlock>> {
self.get_code_blocks_with_language_filter(embedding, limit, distance_threshold, None)
.await
}
pub async fn get_code_blocks_with_language_filter(
&self,
embedding: Vec<f32>,
limit: Option<usize>,
distance_threshold: Option<f32>,
language_filter: Option<&str>,
) -> Result<Vec<CodeBlock>> {
self.get_blocks_with_config(
embedding,
limit,
distance_threshold,
language_filter,
self.code_vector_dim,
)
.await
}
pub async fn get_text_blocks(&self, embedding: Vec<f32>) -> Result<Vec<TextBlock>> {
self.get_text_blocks_with_config(embedding, None, None)
.await
}
pub async fn get_text_blocks_with_config(
&self,
embedding: Vec<f32>,
limit: Option<usize>,
distance_threshold: Option<f32>,
) -> Result<Vec<TextBlock>> {
self.get_blocks_with_config(
embedding,
limit,
distance_threshold,
None,
self.text_vector_dim,
)
.await
}
pub async fn get_document_blocks(&self, embedding: Vec<f32>) -> Result<Vec<DocumentBlock>> {
self.get_document_blocks_with_config(embedding, None, None)
.await
}
pub async fn get_document_blocks_with_config(
&self,
embedding: Vec<f32>,
limit: Option<usize>,
distance_threshold: Option<f32>,
) -> Result<Vec<DocumentBlock>> {
self.get_blocks_with_config(
embedding,
limit,
distance_threshold,
None,
self.text_vector_dim,
)
.await
}
pub async fn remove_blocks_by_path(&self, file_path: &str) -> Result<()> {
let table_ops = TableOperations::new(&self.db);
table_ops
.remove_blocks_by_path(file_path, "code_blocks")
.await?;
table_ops
.remove_blocks_by_path(file_path, "text_blocks")
.await?;
table_ops
.remove_blocks_by_path(file_path, "document_blocks")
.await?;
table_ops
.remove_blocks_by_path(file_path, "graphrag_nodes")
.await?;
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops
.remove_graph_relationships_by_path(file_path)
.await?;
Ok(())
}
pub async fn get_all_indexed_file_paths(&self) -> Result<std::collections::HashSet<String>> {
let table_ops = TableOperations::new(&self.db);
table_ops
.get_all_indexed_file_paths(&["code_blocks", "text_blocks", "document_blocks"])
.await
}
pub async fn flush(&self) -> Result<()> {
let table_ops = TableOperations::new(&self.db);
table_ops.flush_all_tables().await
}
pub async fn close(self) -> Result<()> {
Ok(())
}
pub async fn clear_all_tables(&self) -> Result<()> {
let table_ops = TableOperations::new(&self.db);
table_ops.clear_all_tables().await
}
pub async fn clear_code_table(&self) -> Result<()> {
let table_ops = TableOperations::new(&self.db);
table_ops.clear_table("code_blocks").await
}
pub async fn store_blocks<B: BlockType>(
&self,
blocks: &[B],
embeddings: &[Vec<f32>],
vector_dim: usize,
) -> Result<()> {
let batch = B::to_batch(blocks, embeddings, vector_dim)?;
let table_ops = TableOperations::new(&self.db);
table_ops.store_batch(B::TABLE_NAME, batch).await?;
if let Ok(table) = self.db.open_table(B::TABLE_NAME).execute().await {
let row_count = table.count_rows(None).await?;
let indices = table.list_indices().await?;
let has_vector_index = indices.iter().any(|idx| idx.columns == vec!["embedding"]);
if !has_vector_index {
if let Err(e) = table_ops
.create_vector_index_optimized(B::TABLE_NAME, "embedding", vector_dim)
.await
{
tracing::warn!("Failed to create optimized vector index: {}", e);
}
} else if VectorOptimizer::should_optimize_for_growth(row_count, vector_dim, true) {
tracing::info!(
"Dataset growth detected, optimizing {} index",
B::TABLE_NAME
);
if let Err(e) = table_ops
.recreate_vector_index_optimized(B::TABLE_NAME, "embedding", vector_dim)
.await
{
tracing::warn!("Failed to recreate optimized vector index: {}", e);
}
}
if let Ok(config) = crate::config::Config::load() {
if config.search.hybrid.enabled {
if let Err(e) = table_ops.create_fts_index(B::TABLE_NAME).await {
tracing::warn!("Failed to create FTS index for '{}': {}", B::TABLE_NAME, e);
}
}
}
}
Ok(())
}
pub async fn get_blocks_with_config<B: BlockType>(
&self,
embedding: Vec<f32>,
limit: Option<usize>,
distance_threshold: Option<f32>,
language_filter: Option<&str>,
_vector_dim: usize,
) -> Result<Vec<B>> {
let table_ops = TableOperations::new(&self.db);
if !table_ops.table_exists(B::TABLE_NAME).await? {
return Ok(Vec::new());
}
let table = self.get_table(B::TABLE_NAME).await?;
let mut query = table
.vector_search(embedding)?
.distance_type(DistanceType::Cosine) .limit(limit.unwrap_or(10));
if let Some(language) = language_filter {
query = query.only_if(format!("language = '{}'", language));
}
query = VectorOptimizer::optimize_query(query, &table, B::TABLE_NAME)
.await
.map_err(|e| anyhow::anyhow!("Failed to optimize query: {}", e))?;
let mut results = query.execute().await?;
let mut all_blocks = Vec::new();
while let Some(batch) = results.try_next().await? {
if batch.num_rows() > 0 {
let mut blocks = B::from_batch(&batch)?;
if let Some(distance_threshold_value) = distance_threshold {
blocks.retain(|block| {
block
.distance()
.is_none_or(|d| d <= distance_threshold_value)
});
}
all_blocks.append(&mut blocks);
}
}
all_blocks.sort_by(|a, b| match (a.distance(), b.distance()) {
(Some(dist_a), Some(dist_b)) => dist_a
.partial_cmp(&dist_b)
.unwrap_or(std::cmp::Ordering::Equal),
(Some(_), None) => std::cmp::Ordering::Less, (None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => std::cmp::Ordering::Equal,
});
Ok(all_blocks)
}
pub async fn clear_docs_table(&self) -> Result<()> {
let table_ops = TableOperations::new(&self.db);
table_ops.clear_table("document_blocks").await
}
pub async fn clear_text_table(&self) -> Result<()> {
let table_ops = TableOperations::new(&self.db);
table_ops.clear_table("text_blocks").await
}
pub fn get_code_vector_dim(&self) -> usize {
self.code_vector_dim
}
pub async fn store_git_metadata(&self, commit_hash: &str) -> Result<()> {
let metadata_ops = MetadataOperations::new(&self.db);
metadata_ops.store_git_metadata(commit_hash).await
}
pub async fn get_last_commit_hash(&self) -> Result<Option<String>> {
let metadata_ops = MetadataOperations::new(&self.db);
metadata_ops.get_last_commit_hash().await
}
pub async fn store_file_metadata(&self, file_path: &str, mtime: u64) -> Result<()> {
let metadata_ops = MetadataOperations::new(&self.db);
metadata_ops.store_file_metadata(file_path, mtime).await
}
pub async fn get_file_mtime(&self, file_path: &str) -> Result<Option<u64>> {
let metadata_ops = MetadataOperations::new(&self.db);
metadata_ops.get_file_mtime(file_path).await
}
pub async fn get_all_file_metadata(&self) -> Result<std::collections::HashMap<String, u64>> {
let metadata_ops = MetadataOperations::new(&self.db);
metadata_ops.get_all_file_metadata().await
}
pub async fn clear_git_metadata(&self) -> Result<()> {
let metadata_ops = MetadataOperations::new(&self.db);
metadata_ops.clear_git_metadata().await
}
pub async fn get_graphrag_last_commit_hash(&self) -> Result<Option<String>> {
let metadata_ops = MetadataOperations::new(&self.db);
metadata_ops.get_graphrag_last_commit_hash().await
}
pub async fn store_graphrag_commit_hash(&self, commit_hash: &str) -> Result<()> {
let metadata_ops = MetadataOperations::new(&self.db);
metadata_ops.store_graphrag_commit_hash(commit_hash).await
}
pub async fn graphrag_needs_indexing(&self) -> Result<bool> {
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops.graphrag_needs_indexing().await
}
pub async fn get_all_code_blocks_for_graphrag(&self) -> Result<Vec<CodeBlock>> {
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops.get_all_code_blocks_for_graphrag().await
}
pub async fn store_graph_nodes(&self, node_batch: RecordBatch) -> Result<()> {
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops.store_graph_nodes(node_batch).await
}
pub async fn store_graph_relationships(&self, rel_batch: RecordBatch) -> Result<()> {
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops.store_graph_relationships(rel_batch).await
}
pub async fn clear_graph_nodes(&self) -> Result<()> {
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops.clear_graph_nodes().await
}
pub async fn clear_graph_relationships(&self) -> Result<()> {
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops.clear_graph_relationships().await
}
pub async fn remove_graph_nodes_by_path(&self, file_path: &str) -> Result<usize> {
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops.remove_graph_nodes_by_path(file_path).await
}
pub async fn remove_graph_relationships_by_path(&self, file_path: &str) -> Result<usize> {
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops
.remove_graph_relationships_by_path(file_path)
.await
}
pub async fn search_graph_nodes(&self, embedding: &[f32], limit: usize) -> Result<RecordBatch> {
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops.search_graph_nodes(embedding, limit).await
}
pub async fn get_graph_relationships(&self) -> Result<RecordBatch> {
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops.get_graph_relationships().await
}
pub async fn get_node_relationships(
&self,
node_id: &str,
direction: crate::indexer::graphrag::types::RelationshipDirection,
) -> Result<Vec<crate::indexer::graphrag::types::CodeRelationship>> {
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops
.get_node_relationships(node_id, direction)
.await
}
pub async fn get_relationships_by_type(
&self,
relation_type: &crate::indexer::graphrag::types::RelationType,
) -> Result<Vec<crate::indexer::graphrag::types::CodeRelationship>> {
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops.get_relationships_by_type(relation_type).await
}
pub async fn get_all_nodes_paginated(
&self,
offset: usize,
limit: usize,
) -> Result<Vec<crate::indexer::graphrag::types::CodeNode>> {
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops.get_all_nodes_paginated(offset, limit).await
}
pub async fn get_all_relationships_efficient(
&self,
) -> Result<Vec<crate::indexer::graphrag::types::CodeRelationship>> {
let graphrag_ops = GraphRagOperations::new(
&self.db,
self.code_vector_dim,
Arc::clone(&self.table_cache),
);
graphrag_ops.get_all_relationships_efficient().await
}
pub async fn list_indexed_files(&self) -> Result<()> {
let debug_ops = DebugOperations::new(&self.db, self.code_vector_dim);
debug_ops.list_indexed_files().await
}
pub async fn show_file_chunks(&self, file_path: &str) -> Result<()> {
let debug_ops = DebugOperations::new(&self.db, self.code_vector_dim);
debug_ops.show_file_chunks(file_path).await
}
pub async fn get_code_block_by_symbol(&self, symbol: &str) -> Result<Option<CodeBlock>> {
let table_ops = TableOperations::new(&self.db);
if !table_ops.table_exists("code_blocks").await? {
return Ok(None);
}
let table = self.get_table("code_blocks").await?;
let mut results = table
.query()
.only_if(format!("symbols LIKE '%{}%'", symbol))
.limit(1)
.execute()
.await?;
while let Some(batch) = results.try_next().await? {
if batch.num_rows() > 0 {
let converter = BatchConverter::new(self.code_vector_dim);
let code_blocks = converter.batch_to_code_blocks(&batch, None)?;
return Ok(code_blocks.into_iter().next());
}
}
Ok(None)
}
pub async fn get_code_block_by_hash(&self, hash: &str) -> Result<CodeBlock> {
let table_ops = TableOperations::new(&self.db);
if !table_ops.table_exists("code_blocks").await? {
return Err(anyhow::anyhow!("Code blocks table does not exist"));
}
let table = self.get_table("code_blocks").await?;
let mut results = table
.query()
.only_if(format!("hash = '{}'", hash))
.limit(1)
.execute()
.await?;
while let Some(batch) = results.try_next().await? {
if batch.num_rows() > 0 {
let converter = BatchConverter::new(self.code_vector_dim);
let code_blocks = converter.batch_to_code_blocks(&batch, None)?;
return code_blocks
.into_iter()
.next()
.ok_or_else(|| anyhow::anyhow!("Failed to convert result to CodeBlock"));
}
}
Err(anyhow::anyhow!("Code block with hash {} not found", hash))
}
pub async fn tables_exist(&self, table_names: &[&str]) -> Result<bool> {
let table_ops = TableOperations::new(&self.db);
table_ops.tables_exist(table_names).await
}
pub async fn get_file_blocks_metadata(
&self,
file_path: &str,
table_name: &str,
) -> Result<Vec<String>> {
let table_ops = TableOperations::new(&self.db);
table_ops
.get_file_blocks_metadata(file_path, table_name)
.await
}
pub async fn remove_blocks_by_hashes(&self, hashes: &[String], table_name: &str) -> Result<()> {
let table_ops = TableOperations::new(&self.db);
table_ops.remove_blocks_by_hashes(hashes, table_name).await
}
pub async fn hybrid_search<B: BlockType>(&self, query: &HybridSearchQuery) -> Result<Vec<B>> {
query
.validate()
.map_err(|e| anyhow::anyhow!("Invalid hybrid query: {}", e))?;
let table_ops = TableOperations::new(&self.db);
if !table_ops.table_exists(B::TABLE_NAME).await? {
return Ok(Vec::new());
}
let table = self.get_table(B::TABLE_NAME).await?;
let distance_threshold = query.min_relevance.map(|sim| 1.0 - sim);
let limit = query.limit;
match (&query.vector_query, &query.keywords) {
(Some(embedding), Some(kw_query)) => {
let indices = table.list_indices().await?;
let has_fts = indices
.iter()
.any(|idx| idx.index_type == lancedb::index::IndexType::FTS);
if !has_fts {
table_ops.create_fts_index(B::TABLE_NAME).await?;
}
let mut vq = table
.vector_search(embedding.clone())?
.distance_type(DistanceType::Cosine)
.limit(limit)
.full_text_search(FullTextSearchQuery::new(kw_query.clone()));
if let Some(lang) = query.language_filter.as_deref() {
vq = vq.only_if(format!("language = '{}'", lang));
}
vq = VectorOptimizer::optimize_query(vq, &table, B::TABLE_NAME)
.await
.map_err(|e| anyhow::anyhow!("Failed to optimize query: {}", e))?;
let mut stream = vq.execute().await?;
let mut blocks = Vec::new();
while let Some(batch) = stream.try_next().await? {
if batch.num_rows() > 0 {
let mut batch_blocks = B::from_batch(&batch)?;
if let Some(thresh) = distance_threshold {
batch_blocks.retain(|b| b.distance().is_none_or(|d| d <= thresh));
}
blocks.append(&mut batch_blocks);
}
}
blocks.sort_by(|a, b| {
a.distance()
.partial_cmp(&b.distance())
.unwrap_or(std::cmp::Ordering::Equal)
});
blocks.truncate(limit);
Ok(blocks)
}
(Some(embedding), None) => {
self.get_blocks_with_config::<B>(
embedding.clone(),
Some(limit),
distance_threshold,
query.language_filter.as_deref(),
0,
)
.await
}
(None, Some(kw_query)) => {
let indices = table.list_indices().await?;
let has_fts = indices
.iter()
.any(|idx| idx.index_type == lancedb::index::IndexType::FTS);
if !has_fts {
table_ops.create_fts_index(B::TABLE_NAME).await?;
}
let mut q = table
.query()
.full_text_search(FullTextSearchQuery::new(kw_query.clone()))
.limit(limit);
if let Some(lang) = query.language_filter.as_deref() {
q = q.only_if(format!("language = '{}'", lang));
}
let mut stream = q.execute().await?;
let mut blocks = Vec::new();
while let Some(batch) = stream.try_next().await? {
if batch.num_rows() > 0 {
blocks.append(&mut B::from_batch(&batch)?);
}
}
blocks.truncate(limit);
Ok(blocks)
}
(None, None) => unreachable!("validate() ensures at least one signal"),
}
}
pub async fn ensure_fts_index(&self, table_name: &str) -> Result<()> {
let table_ops = TableOperations::new(&self.db);
table_ops.create_fts_index(table_name).await
}
}