use zeph_memory::{FieldCondition, FieldValue, QdrantOps, VectorFilter, VectorPoint, VectorStore};
use crate::error::Result;
const CODE_COLLECTION: &str = "zeph_code_chunks";
#[derive(Clone)]
pub struct CodeStore {
ops: QdrantOps,
collection: String,
pool: sqlx::SqlitePool,
}
pub struct ChunkInsert<'a> {
pub file_path: &'a str,
pub language: &'a str,
pub node_type: &'a str,
pub entity_name: Option<&'a str>,
pub line_start: usize,
pub line_end: usize,
pub code: &'a str,
pub scope_chain: &'a str,
pub content_hash: &'a str,
}
#[derive(Debug)]
pub struct SearchHit {
pub code: String,
pub file_path: String,
pub line_range: (usize, usize),
pub score: f32,
pub node_type: String,
pub entity_name: Option<String>,
pub scope_chain: String,
}
impl CodeStore {
#[must_use]
pub fn with_ops(ops: QdrantOps, pool: sqlx::SqlitePool) -> Self {
Self {
ops,
collection: CODE_COLLECTION.into(),
pool,
}
}
pub async fn ensure_collection(&self, vector_size: u64) -> Result<()> {
self.ops
.ensure_collection_with_quantization(
&self.collection,
vector_size,
&["language", "file_path", "node_type"],
)
.await?;
Ok(())
}
pub async fn upsert_chunk(&self, chunk: &ChunkInsert<'_>, vector: Vec<f32>) -> Result<String> {
let point_id = uuid::Uuid::new_v4().to_string();
let payload = serde_json::json!({
"file_path": chunk.file_path,
"language": chunk.language,
"node_type": chunk.node_type,
"entity_name": chunk.entity_name,
"line_start": chunk.line_start,
"line_end": chunk.line_end,
"code": chunk.code,
"scope_chain": chunk.scope_chain,
"content_hash": chunk.content_hash,
});
let payload_map = match payload {
serde_json::Value::Object(m) => m.into_iter().collect(),
_ => std::collections::HashMap::new(),
};
VectorStore::upsert(
&self.ops,
&self.collection,
vec![VectorPoint {
id: point_id.clone(),
vector,
payload: payload_map,
}],
)
.await?;
let line_start = i64::try_from(chunk.line_start)?;
let line_end = i64::try_from(chunk.line_end)?;
sqlx::query(
"INSERT OR REPLACE INTO chunk_metadata \
(qdrant_id, file_path, content_hash, line_start, line_end, language, node_type, entity_name) \
VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&point_id)
.bind(chunk.file_path)
.bind(chunk.content_hash)
.bind(line_start)
.bind(line_end)
.bind(chunk.language)
.bind(chunk.node_type)
.bind(chunk.entity_name)
.execute(&self.pool)
.await?;
Ok(point_id)
}
pub async fn chunk_exists(&self, content_hash: &str) -> Result<bool> {
let row: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM chunk_metadata WHERE content_hash = ?")
.bind(content_hash)
.fetch_one(&self.pool)
.await?;
Ok(row.0 > 0)
}
pub async fn remove_file_chunks(&self, file_path: &str) -> Result<usize> {
let ids: Vec<(String,)> =
sqlx::query_as("SELECT qdrant_id FROM chunk_metadata WHERE file_path = ?")
.bind(file_path)
.fetch_all(&self.pool)
.await?;
if ids.is_empty() {
return Ok(0);
}
let point_ids: Vec<String> = ids.iter().map(|(id,)| id.clone()).collect();
VectorStore::delete_by_ids(&self.ops, &self.collection, point_ids).await?;
let count = ids.len();
sqlx::query("DELETE FROM chunk_metadata WHERE file_path = ?")
.bind(file_path)
.execute(&self.pool)
.await?;
Ok(count)
}
pub async fn search(
&self,
query_vector: Vec<f32>,
limit: usize,
language_filter: Option<String>,
) -> Result<Vec<SearchHit>> {
let limit_u64 = u64::try_from(limit)?;
let filter = language_filter.map(|lang| VectorFilter {
must: vec![FieldCondition {
field: "language".into(),
value: FieldValue::Text(lang),
}],
must_not: vec![],
});
let results =
VectorStore::search(&self.ops, &self.collection, query_vector, limit_u64, filter)
.await?;
Ok(results
.into_iter()
.filter_map(|p| SearchHit::from_payload(&p))
.collect())
}
pub async fn indexed_files(&self) -> Result<Vec<String>> {
let rows: Vec<(String,)> = sqlx::query_as("SELECT DISTINCT file_path FROM chunk_metadata")
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(|(p,)| p).collect())
}
}
impl SearchHit {
fn from_payload(point: &zeph_memory::ScoredVectorPoint) -> Option<Self> {
let get_str = |key: &str| -> Option<String> {
point
.payload
.get(key)
.and_then(serde_json::Value::as_str)
.map(ToOwned::to_owned)
};
let get_usize = |key: &str| -> Option<usize> {
point
.payload
.get(key)
.and_then(serde_json::Value::as_i64)
.and_then(|v| usize::try_from(v).ok())
};
Some(Self {
code: get_str("code")?,
file_path: get_str("file_path")?,
line_range: (get_usize("line_start")?, get_usize("line_end")?),
score: point.score,
node_type: get_str("node_type")?,
entity_name: get_str("entity_name"),
scope_chain: get_str("scope_chain").unwrap_or_default(),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use zeph_memory::ScoredVectorPoint;
fn make_scored_point(payload: serde_json::Value, score: f32) -> ScoredVectorPoint {
let map = match payload {
serde_json::Value::Object(m) => m.into_iter().collect(),
_ => std::collections::HashMap::new(),
};
ScoredVectorPoint {
id: "test-id".to_string(),
score,
payload: map,
}
}
#[test]
fn search_hit_from_payload_full() {
let point = make_scored_point(
serde_json::json!({
"code": "fn foo() {}",
"file_path": "src/lib.rs",
"line_start": 10,
"line_end": 12,
"node_type": "function_item",
"entity_name": "foo",
"scope_chain": "mod::foo"
}),
0.9,
);
let hit = SearchHit::from_payload(&point).unwrap();
assert_eq!(hit.code, "fn foo() {}");
assert_eq!(hit.file_path, "src/lib.rs");
assert_eq!(hit.line_range, (10, 12));
assert!((hit.score - 0.9).abs() < f32::EPSILON);
assert_eq!(hit.node_type, "function_item");
assert_eq!(hit.entity_name, Some("foo".to_string()));
assert_eq!(hit.scope_chain, "mod::foo");
}
#[test]
fn search_hit_from_payload_no_entity_name() {
let point = make_scored_point(
serde_json::json!({
"code": "struct Bar {}",
"file_path": "src/bar.rs",
"line_start": 1,
"line_end": 3,
"node_type": "struct_item",
"scope_chain": ""
}),
0.7,
);
let hit = SearchHit::from_payload(&point).unwrap();
assert!(hit.entity_name.is_none());
assert_eq!(hit.node_type, "struct_item");
}
#[test]
fn search_hit_from_payload_missing_required_field_returns_none() {
let point = make_scored_point(
serde_json::json!({
"file_path": "src/lib.rs",
"line_start": 1,
"line_end": 2,
"node_type": "function_item"
}),
0.5,
);
assert!(SearchHit::from_payload(&point).is_none());
}
async fn setup_pool() -> sqlx::SqlitePool {
let pool = sqlx::SqlitePool::connect("sqlite::memory:").await.unwrap();
zeph_memory::sqlite::SqliteStore::run_migrations(&pool)
.await
.unwrap();
pool
}
#[tokio::test]
async fn chunk_exists_returns_false_then_true() {
let pool = setup_pool().await;
let exists = sqlx::query_as::<_, (i64,)>(
"SELECT COUNT(*) FROM chunk_metadata WHERE content_hash = ?",
)
.bind("abc123")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(exists.0, 0);
sqlx::query(
"INSERT INTO chunk_metadata \
(qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
VALUES (?, ?, ?, ?, ?, ?, ?)",
)
.bind("q1")
.bind("src/main.rs")
.bind("abc123")
.bind(1_i64)
.bind(10_i64)
.bind("rust")
.bind("function_item")
.execute(&pool)
.await
.unwrap();
let exists = sqlx::query_as::<_, (i64,)>(
"SELECT COUNT(*) FROM chunk_metadata WHERE content_hash = ?",
)
.bind("abc123")
.fetch_one(&pool)
.await
.unwrap();
assert!(exists.0 > 0);
}
#[tokio::test]
async fn remove_file_chunks_cleans_sqlite() {
let pool = setup_pool().await;
for i in 0..3 {
sqlx::query(
"INSERT INTO chunk_metadata \
(qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
VALUES (?, ?, ?, ?, ?, ?, ?)",
)
.bind(format!("q{i}"))
.bind("src/lib.rs")
.bind(format!("hash{i}"))
.bind(1_i64)
.bind(10_i64)
.bind("rust")
.bind("function_item")
.execute(&pool)
.await
.unwrap();
}
let ids: Vec<(String,)> =
sqlx::query_as("SELECT qdrant_id FROM chunk_metadata WHERE file_path = ?")
.bind("src/lib.rs")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(ids.len(), 3);
sqlx::query("DELETE FROM chunk_metadata WHERE file_path = ?")
.bind("src/lib.rs")
.execute(&pool)
.await
.unwrap();
let remaining: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM chunk_metadata WHERE file_path = ?")
.bind("src/lib.rs")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(remaining.0, 0);
}
#[tokio::test]
async fn indexed_files_distinct() {
let pool = setup_pool().await;
for (i, path) in ["src/a.rs", "src/b.rs", "src/a.rs"].iter().enumerate() {
sqlx::query(
"INSERT OR REPLACE INTO chunk_metadata \
(qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
VALUES (?, ?, ?, ?, ?, ?, ?)",
)
.bind(format!("q{i}"))
.bind(path)
.bind(format!("hash{i}"))
.bind(1_i64)
.bind(10_i64)
.bind("rust")
.bind("function_item")
.execute(&pool)
.await
.unwrap();
}
let rows: Vec<(String,)> = sqlx::query_as("SELECT DISTINCT file_path FROM chunk_metadata")
.fetch_all(&pool)
.await
.unwrap();
let files: Vec<String> = rows.into_iter().map(|(p,)| p).collect();
assert_eq!(files.len(), 2);
assert!(files.contains(&"src/a.rs".to_string()));
assert!(files.contains(&"src/b.rs".to_string()));
}
}