use qdrant_client::qdrant::{
CreateCollectionBuilder, CreateFieldIndexCollectionBuilder, Distance, FieldType, Filter,
PointStruct, ScalarQuantizationBuilder, ScoredPoint, VectorParamsBuilder,
};
use zeph_memory::QdrantOps;
use crate::error::Result;
const CODE_COLLECTION: &str = "zeph_code_chunks";
#[derive(Clone)]
pub struct CodeStore {
ops: QdrantOps,
collection: String,
pool: sqlx::SqlitePool,
}
pub struct ChunkInsert<'a> {
pub file_path: &'a str,
pub language: &'a str,
pub node_type: &'a str,
pub entity_name: Option<&'a str>,
pub line_start: usize,
pub line_end: usize,
pub code: &'a str,
pub scope_chain: &'a str,
pub content_hash: &'a str,
}
#[derive(Debug)]
pub struct SearchHit {
pub code: String,
pub file_path: String,
pub line_range: (usize, usize),
pub score: f32,
pub node_type: String,
pub entity_name: Option<String>,
pub scope_chain: String,
}
impl CodeStore {
pub fn new(qdrant_url: &str, pool: sqlx::SqlitePool) -> Result<Self> {
let ops = QdrantOps::new(qdrant_url).map_err(crate::error::IndexError::Qdrant)?;
Ok(Self {
ops,
collection: CODE_COLLECTION.into(),
pool,
})
}
pub async fn ensure_collection(&self, vector_size: u64) -> Result<()> {
if self.ops.collection_exists(&self.collection).await? {
return Ok(());
}
self.ops
.client()
.create_collection(
CreateCollectionBuilder::new(&self.collection)
.vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
.quantization_config(ScalarQuantizationBuilder::default()),
)
.await
.map_err(Box::new)?;
for field in ["language", "file_path", "node_type"] {
self.ops
.client()
.create_field_index(CreateFieldIndexCollectionBuilder::new(
&self.collection,
field,
FieldType::Keyword,
))
.await
.map_err(Box::new)?;
}
Ok(())
}
pub async fn upsert_chunk(&self, chunk: &ChunkInsert<'_>, vector: Vec<f32>) -> Result<String> {
let point_id = uuid::Uuid::new_v4().to_string();
let payload = QdrantOps::json_to_payload(serde_json::json!({
"file_path": chunk.file_path,
"language": chunk.language,
"node_type": chunk.node_type,
"entity_name": chunk.entity_name,
"line_start": chunk.line_start,
"line_end": chunk.line_end,
"code": chunk.code,
"scope_chain": chunk.scope_chain,
"content_hash": chunk.content_hash,
}))?;
self.ops
.upsert(
&self.collection,
vec![PointStruct::new(point_id.clone(), vector, payload)],
)
.await?;
let line_start = i64::try_from(chunk.line_start)?;
let line_end = i64::try_from(chunk.line_end)?;
sqlx::query(
"INSERT OR REPLACE INTO chunk_metadata \
(qdrant_id, file_path, content_hash, line_start, line_end, language, node_type, entity_name) \
VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&point_id)
.bind(chunk.file_path)
.bind(chunk.content_hash)
.bind(line_start)
.bind(line_end)
.bind(chunk.language)
.bind(chunk.node_type)
.bind(chunk.entity_name)
.execute(&self.pool)
.await?;
Ok(point_id)
}
pub async fn chunk_exists(&self, content_hash: &str) -> Result<bool> {
let row: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM chunk_metadata WHERE content_hash = ?")
.bind(content_hash)
.fetch_one(&self.pool)
.await?;
Ok(row.0 > 0)
}
pub async fn remove_file_chunks(&self, file_path: &str) -> Result<usize> {
let ids: Vec<(String,)> =
sqlx::query_as("SELECT qdrant_id FROM chunk_metadata WHERE file_path = ?")
.bind(file_path)
.fetch_all(&self.pool)
.await?;
if ids.is_empty() {
return Ok(0);
}
let point_ids = ids
.iter()
.map(|(id,)| id.clone().into())
.collect::<Vec<_>>();
self.ops.delete_by_ids(&self.collection, point_ids).await?;
let count = ids.len();
sqlx::query("DELETE FROM chunk_metadata WHERE file_path = ?")
.bind(file_path)
.execute(&self.pool)
.await?;
Ok(count)
}
pub async fn search(
&self,
query_vector: Vec<f32>,
limit: usize,
filter: Option<Filter>,
) -> Result<Vec<SearchHit>> {
let limit_u64 = u64::try_from(limit)?;
let results = self
.ops
.search(&self.collection, query_vector, limit_u64, filter)
.await?;
Ok(results
.iter()
.filter_map(SearchHit::from_scored_point)
.collect())
}
pub async fn indexed_files(&self) -> Result<Vec<String>> {
let rows: Vec<(String,)> = sqlx::query_as("SELECT DISTINCT file_path FROM chunk_metadata")
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(|(p,)| p).collect())
}
}
impl SearchHit {
fn from_scored_point(point: &ScoredPoint) -> Option<Self> {
let p = &point.payload;
let get_str = |key: &str| {
p.get(key)
.and_then(qdrant_client::qdrant::Value::as_str)
.cloned()
};
let get_int = |key: &str| {
p.get(key)
.and_then(qdrant_client::qdrant::Value::as_integer)
.and_then(|v| usize::try_from(v).ok())
};
Some(Self {
code: get_str("code")?,
file_path: get_str("file_path")?,
line_range: (get_int("line_start")?, get_int("line_end")?),
score: point.score,
node_type: get_str("node_type")?,
entity_name: get_str("entity_name"),
scope_chain: get_str("scope_chain").unwrap_or_default(),
})
}
}
#[cfg(test)]
mod tests {
async fn setup_pool() -> sqlx::SqlitePool {
let pool = sqlx::SqlitePool::connect("sqlite::memory:").await.unwrap();
zeph_memory::sqlite::SqliteStore::run_migrations(&pool)
.await
.unwrap();
pool
}
#[tokio::test]
async fn chunk_exists_returns_false_then_true() {
let pool = setup_pool().await;
let exists = sqlx::query_as::<_, (i64,)>(
"SELECT COUNT(*) FROM chunk_metadata WHERE content_hash = ?",
)
.bind("abc123")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(exists.0, 0);
sqlx::query(
"INSERT INTO chunk_metadata \
(qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
VALUES (?, ?, ?, ?, ?, ?, ?)",
)
.bind("q1")
.bind("src/main.rs")
.bind("abc123")
.bind(1_i64)
.bind(10_i64)
.bind("rust")
.bind("function_item")
.execute(&pool)
.await
.unwrap();
let exists = sqlx::query_as::<_, (i64,)>(
"SELECT COUNT(*) FROM chunk_metadata WHERE content_hash = ?",
)
.bind("abc123")
.fetch_one(&pool)
.await
.unwrap();
assert!(exists.0 > 0);
}
#[tokio::test]
async fn remove_file_chunks_cleans_sqlite() {
let pool = setup_pool().await;
for i in 0..3 {
sqlx::query(
"INSERT INTO chunk_metadata \
(qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
VALUES (?, ?, ?, ?, ?, ?, ?)",
)
.bind(format!("q{i}"))
.bind("src/lib.rs")
.bind(format!("hash{i}"))
.bind(1_i64)
.bind(10_i64)
.bind("rust")
.bind("function_item")
.execute(&pool)
.await
.unwrap();
}
let ids: Vec<(String,)> =
sqlx::query_as("SELECT qdrant_id FROM chunk_metadata WHERE file_path = ?")
.bind("src/lib.rs")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(ids.len(), 3);
sqlx::query("DELETE FROM chunk_metadata WHERE file_path = ?")
.bind("src/lib.rs")
.execute(&pool)
.await
.unwrap();
let remaining: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM chunk_metadata WHERE file_path = ?")
.bind("src/lib.rs")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(remaining.0, 0);
}
#[tokio::test]
async fn indexed_files_distinct() {
let pool = setup_pool().await;
for (i, path) in ["src/a.rs", "src/b.rs", "src/a.rs"].iter().enumerate() {
sqlx::query(
"INSERT OR REPLACE INTO chunk_metadata \
(qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
VALUES (?, ?, ?, ?, ?, ?, ?)",
)
.bind(format!("q{i}"))
.bind(path)
.bind(format!("hash{i}"))
.bind(1_i64)
.bind(10_i64)
.bind("rust")
.bind("function_item")
.execute(&pool)
.await
.unwrap();
}
let rows: Vec<(String,)> = sqlx::query_as("SELECT DISTINCT file_path FROM chunk_metadata")
.fetch_all(&pool)
.await
.unwrap();
let files: Vec<String> = rows.into_iter().map(|(p,)| p).collect();
assert_eq!(files.len(), 2);
assert!(files.contains(&"src/a.rs".to_string()));
assert!(files.contains(&"src/b.rs".to_string()));
}
}