1#[allow(unused_imports)]
23use zeph_db::sql;
24use zeph_memory::{FieldCondition, FieldValue, QdrantOps, VectorFilter, VectorPoint, VectorStore};
25
26use zeph_common::{EmbeddingVector, Normalized};
27
28use crate::error::Result;
29
30const CODE_COLLECTION: &str = "zeph_code_chunks";
31
32#[derive(Clone)]
46pub struct CodeStore {
47 ops: QdrantOps,
48 collection: String,
49 pool: zeph_db::DbPool,
50}
51
52pub struct ChunkInsert<'a> {
58 pub file_path: &'a str,
60 pub language: &'a str,
62 pub node_type: &'a str,
64 pub entity_name: Option<&'a str>,
66 pub line_start: usize,
68 pub line_end: usize,
70 pub code: &'a str,
72 pub scope_chain: &'a str,
74 pub content_hash: &'a str,
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct NodeKind(pub String);
94
95impl std::fmt::Display for NodeKind {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 f.write_str(&self.0)
98 }
99}
100
101impl AsRef<str> for NodeKind {
102 fn as_ref(&self) -> &str {
103 &self.0
104 }
105}
106
107impl From<String> for NodeKind {
108 fn from(s: String) -> Self {
109 Self(s)
110 }
111}
112
113impl From<&str> for NodeKind {
114 fn from(s: &str) -> Self {
115 Self(s.to_owned())
116 }
117}
118
119#[derive(Debug)]
124pub struct SearchHit {
125 pub code: String,
127 pub file_path: String,
129 pub line_range: (usize, usize),
131 pub score: f32,
133 pub node_type: NodeKind,
135 pub language: crate::languages::Lang,
137 pub entity_name: Option<String>,
139 pub scope_chain: String,
141}
142
143impl CodeStore {
144 #[must_use]
164 pub fn with_ops(ops: QdrantOps, pool: zeph_db::DbPool) -> Self {
165 Self {
166 ops,
167 collection: CODE_COLLECTION.into(),
168 pool,
169 }
170 }
171
172 pub async fn ensure_collection(&self, vector_size: u64) -> Result<()> {
178 self.ops
179 .ensure_collection_with_quantization(
180 &self.collection,
181 vector_size,
182 &["language", "file_path", "node_type"],
183 )
184 .await?;
185 Ok(())
186 }
187
188 #[tracing::instrument(name = "index.store.upsert_chunk", skip_all)]
194 pub async fn upsert_chunk(&self, chunk: &ChunkInsert<'_>, vector: Vec<f32>) -> Result<String> {
195 tracing::Span::current().record("file_path", chunk.file_path);
196 let point_id = uuid::Uuid::new_v4().to_string();
197
198 let payload = serde_json::json!({
199 "file_path": chunk.file_path,
200 "language": chunk.language,
201 "node_type": chunk.node_type,
202 "entity_name": chunk.entity_name,
203 "line_start": chunk.line_start,
204 "line_end": chunk.line_end,
205 "code": chunk.code,
206 "scope_chain": chunk.scope_chain,
207 "content_hash": chunk.content_hash,
208 });
209
210 let payload_map = match payload {
211 serde_json::Value::Object(m) => m.into_iter().collect(),
212 _ => std::collections::HashMap::new(),
213 };
214
215 VectorStore::upsert(
216 &self.ops,
217 &self.collection,
218 vec![VectorPoint {
219 id: point_id.clone(),
220 vector,
221 payload: payload_map,
222 }],
223 )
224 .await?;
225
226 let line_start = i64::try_from(chunk.line_start)?;
227 let line_end = i64::try_from(chunk.line_end)?;
228
229 zeph_db::query(
230 sql!("INSERT INTO chunk_metadata \
231 (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type, entity_name) \
232 VALUES (?, ?, ?, ?, ?, ?, ?, ?) \
233 ON CONFLICT(file_path, content_hash) DO UPDATE SET \
234 qdrant_id = excluded.qdrant_id, \
235 line_start = excluded.line_start, line_end = excluded.line_end, \
236 language = excluded.language, node_type = excluded.node_type, \
237 entity_name = excluded.entity_name"),
238 )
239 .bind(&point_id)
240 .bind(chunk.file_path)
241 .bind(chunk.content_hash)
242 .bind(line_start)
243 .bind(line_end)
244 .bind(chunk.language)
245 .bind(chunk.node_type)
246 .bind(chunk.entity_name)
247 .execute(&self.pool)
248 .await?;
249
250 Ok(point_id)
251 }
252
253 #[tracing::instrument(name = "index.store.upsert_chunks_batch", skip_all)]
262 pub async fn upsert_chunks_batch(
263 &self,
264 chunks: Vec<(ChunkInsert<'_>, Vec<f32>)>,
265 ) -> Result<Vec<String>> {
266 tracing::Span::current().record("chunk_count", chunks.len());
267 if chunks.is_empty() {
268 return Ok(Vec::new());
269 }
270
271 let mut point_ids: Vec<String> = Vec::with_capacity(chunks.len());
272 let mut points: Vec<VectorPoint> = Vec::with_capacity(chunks.len());
273
274 for (chunk, vector) in &chunks {
275 let point_id = uuid::Uuid::new_v4().to_string();
276
277 let payload = serde_json::json!({
278 "file_path": chunk.file_path,
279 "language": chunk.language,
280 "node_type": chunk.node_type,
281 "entity_name": chunk.entity_name,
282 "line_start": chunk.line_start,
283 "line_end": chunk.line_end,
284 "code": chunk.code,
285 "scope_chain": chunk.scope_chain,
286 "content_hash": chunk.content_hash,
287 });
288
289 let payload_map = match payload {
290 serde_json::Value::Object(m) => m.into_iter().collect(),
291 _ => std::collections::HashMap::new(),
292 };
293
294 points.push(VectorPoint {
295 id: point_id.clone(),
296 vector: vector.clone(),
297 payload: payload_map,
298 });
299 point_ids.push(point_id);
300 }
301
302 VectorStore::upsert(&self.ops, &self.collection, points).await?;
303
304 let mut tx = self.pool.begin().await?;
305 for (idx, (chunk, _)) in chunks.iter().enumerate() {
306 let point_id = &point_ids[idx];
307 let line_start = i64::try_from(chunk.line_start)?;
308 let line_end = i64::try_from(chunk.line_end)?;
309
310 zeph_db::query(
311 sql!("INSERT INTO chunk_metadata \
312 (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type, entity_name) \
313 VALUES (?, ?, ?, ?, ?, ?, ?, ?) \
314 ON CONFLICT(file_path, content_hash) DO UPDATE SET \
315 qdrant_id = excluded.qdrant_id, \
316 line_start = excluded.line_start, line_end = excluded.line_end, \
317 language = excluded.language, node_type = excluded.node_type, \
318 entity_name = excluded.entity_name"),
319 )
320 .bind(point_id)
321 .bind(chunk.file_path)
322 .bind(chunk.content_hash)
323 .bind(line_start)
324 .bind(line_end)
325 .bind(chunk.language)
326 .bind(chunk.node_type)
327 .bind(chunk.entity_name)
328 .execute(&mut *tx)
329 .await?;
330 }
331 tx.commit().await?;
332
333 Ok(point_ids)
334 }
335
336 #[tracing::instrument(name = "index.store.chunk_exists", skip_all, fields(%content_hash))]
342 pub async fn chunk_exists(&self, content_hash: &str) -> Result<bool> {
343 let row: (i64,) = zeph_db::query_as(sql!(
344 "SELECT COUNT(*) FROM chunk_metadata WHERE content_hash = ?"
345 ))
346 .bind(content_hash)
347 .fetch_one(&self.pool)
348 .await?;
349 Ok(row.0 > 0)
350 }
351
352 #[tracing::instrument(name = "index.store.existing_hashes", skip_all)]
361 pub async fn existing_hashes(
362 &self,
363 hashes: &[&str],
364 ) -> Result<std::collections::HashSet<String>> {
365 tracing::Span::current().record("hash_count", hashes.len());
366 if hashes.is_empty() {
367 return Ok(std::collections::HashSet::new());
368 }
369
370 let mut result = std::collections::HashSet::new();
371
372 for chunk in hashes.chunks(900) {
373 let placeholders = std::iter::repeat_n("?", chunk.len())
374 .collect::<Vec<_>>()
375 .join(", ");
376 let sql = format!(
377 "SELECT content_hash FROM chunk_metadata WHERE content_hash IN ({placeholders})"
378 );
379 let mut query = zeph_db::query_scalar::<_, String>(&sql);
380 for hash in chunk {
381 query = query.bind(*hash);
382 }
383 let rows: Vec<String> = query.fetch_all(&self.pool).await?;
384 result.extend(rows);
385 }
386
387 Ok(result)
388 }
389
390 #[tracing::instrument(name = "index.store.remove_file_chunks", skip_all)]
396 pub async fn remove_file_chunks(&self, file_path: &str) -> Result<usize> {
397 tracing::Span::current().record("file_path", file_path);
398 let ids: Vec<(String,)> = zeph_db::query_as(sql!(
399 "SELECT qdrant_id FROM chunk_metadata WHERE file_path = ?"
400 ))
401 .bind(file_path)
402 .fetch_all(&self.pool)
403 .await?;
404
405 if ids.is_empty() {
406 return Ok(0);
407 }
408
409 let point_ids: Vec<String> = ids.iter().map(|(id,)| id.clone()).collect();
410
411 VectorStore::delete_by_ids(&self.ops, &self.collection, point_ids).await?;
412
413 let count = ids.len();
414 zeph_db::query(sql!("DELETE FROM chunk_metadata WHERE file_path = ?"))
415 .bind(file_path)
416 .execute(&self.pool)
417 .await?;
418
419 Ok(count)
420 }
421
422 #[tracing::instrument(name = "index.store.search", skip_all)]
436 pub async fn search(
437 &self,
438 query_vector: EmbeddingVector<Normalized>,
439 limit: usize,
440 language_filter: Option<String>,
441 ) -> Result<Vec<SearchHit>> {
442 let limit_u64 = u64::try_from(limit)?;
443 let filter = language_filter.map(|lang| VectorFilter {
444 must: vec![FieldCondition {
445 field: "language".into(),
446 value: FieldValue::Text(lang),
447 }],
448 must_not: vec![],
449 });
450
451 let results = VectorStore::search(
452 &self.ops,
453 &self.collection,
454 query_vector.into_inner(),
455 limit_u64,
456 filter,
457 )
458 .await?;
459
460 Ok(results
461 .into_iter()
462 .filter_map(|p| SearchHit::from_payload(&p))
463 .collect())
464 }
465
466 #[tracing::instrument(name = "index.store.indexed_files", skip_all)]
472 pub async fn indexed_files(&self) -> Result<Vec<String>> {
473 let rows: Vec<(String,)> =
474 zeph_db::query_as(sql!("SELECT DISTINCT file_path FROM chunk_metadata"))
475 .fetch_all(&self.pool)
476 .await?;
477 Ok(rows.into_iter().map(|(p,)| p).collect())
478 }
479}
480
481impl SearchHit {
482 fn from_payload(point: &zeph_memory::ScoredVectorPoint) -> Option<Self> {
483 let get_str = |key: &str| -> Option<String> {
484 point
485 .payload
486 .get(key)
487 .and_then(serde_json::Value::as_str)
488 .map(ToOwned::to_owned)
489 };
490 let get_usize = |key: &str| -> Option<usize> {
491 point
492 .payload
493 .get(key)
494 .and_then(serde_json::Value::as_i64)
495 .and_then(|v| usize::try_from(v).ok())
496 };
497
498 let language_str = get_str("language")?;
499 let language = crate::languages::Lang::from_id(&language_str)?;
500 Some(Self {
501 code: get_str("code")?,
502 file_path: get_str("file_path")?,
503 line_range: (get_usize("line_start")?, get_usize("line_end")?),
504 score: point.score,
505 node_type: NodeKind::from(get_str("node_type")?),
506 language,
507 entity_name: get_str("entity_name"),
508 scope_chain: get_str("scope_chain").unwrap_or_default(),
509 })
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516 use zeph_memory::ScoredVectorPoint;
517
518 fn make_scored_point(payload: serde_json::Value, score: f32) -> ScoredVectorPoint {
519 let map = match payload {
520 serde_json::Value::Object(m) => m.into_iter().collect(),
521 _ => std::collections::HashMap::new(),
522 };
523 ScoredVectorPoint {
524 id: "test-id".to_string(),
525 score,
526 payload: map,
527 }
528 }
529
530 #[test]
531 fn search_hit_from_payload_full() {
532 let point = make_scored_point(
533 serde_json::json!({
534 "code": "fn foo() {}",
535 "file_path": "src/lib.rs",
536 "line_start": 10,
537 "line_end": 12,
538 "language": "rust",
539 "node_type": "function_item",
540 "entity_name": "foo",
541 "scope_chain": "mod::foo"
542 }),
543 0.9,
544 );
545 let hit = SearchHit::from_payload(&point).unwrap();
546 assert_eq!(hit.code, "fn foo() {}");
547 assert_eq!(hit.file_path, "src/lib.rs");
548 assert_eq!(hit.line_range, (10, 12));
549 assert!((hit.score - 0.9).abs() < f32::EPSILON);
550 assert_eq!(hit.node_type.as_ref(), "function_item");
551 assert_eq!(hit.language, crate::languages::Lang::Rust);
552 assert_eq!(hit.entity_name, Some("foo".to_string()));
553 assert_eq!(hit.scope_chain, "mod::foo");
554 }
555
556 #[test]
557 fn search_hit_from_payload_no_entity_name() {
558 let point = make_scored_point(
559 serde_json::json!({
560 "code": "struct Bar {}",
561 "file_path": "src/bar.rs",
562 "line_start": 1,
563 "line_end": 3,
564 "language": "rust",
565 "node_type": "struct_item",
566 "scope_chain": ""
567 }),
568 0.7,
569 );
570 let hit = SearchHit::from_payload(&point).unwrap();
571 assert!(hit.entity_name.is_none());
572 assert_eq!(hit.node_type.as_ref(), "struct_item");
573 }
574
575 #[test]
576 fn search_hit_from_payload_missing_required_field_returns_none() {
577 let point = make_scored_point(
579 serde_json::json!({
580 "file_path": "src/lib.rs",
581 "line_start": 1,
582 "line_end": 2,
583 "language": "rust",
584 "node_type": "function_item"
585 }),
586 0.5,
587 );
588 assert!(SearchHit::from_payload(&point).is_none());
589 }
590
591 async fn setup_pool() -> zeph_db::DbPool {
592 zeph_db::DbConfig {
593 url: ":memory:".to_string(),
594 ..Default::default()
595 }
596 .connect()
597 .await
598 .unwrap()
599 }
600
601 #[tokio::test]
602 async fn chunk_exists_returns_false_then_true() {
603 let pool = setup_pool().await;
604
605 let exists = zeph_db::query_as::<_, (i64,)>(sql!(
606 "SELECT COUNT(*) FROM chunk_metadata WHERE content_hash = ?"
607 ))
608 .bind("abc123")
609 .fetch_one(&pool)
610 .await
611 .unwrap();
612 assert_eq!(exists.0, 0);
613
614 zeph_db::query(sql!(
615 "INSERT INTO chunk_metadata \
616 (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
617 VALUES (?, ?, ?, ?, ?, ?, ?)"
618 ))
619 .bind("q1")
620 .bind("src/main.rs")
621 .bind("abc123")
622 .bind(1_i64)
623 .bind(10_i64)
624 .bind("rust")
625 .bind("function_item")
626 .execute(&pool)
627 .await
628 .unwrap();
629
630 let exists = zeph_db::query_as::<_, (i64,)>(sql!(
631 "SELECT COUNT(*) FROM chunk_metadata WHERE content_hash = ?"
632 ))
633 .bind("abc123")
634 .fetch_one(&pool)
635 .await
636 .unwrap();
637 assert!(exists.0 > 0);
638 }
639
640 #[tokio::test]
641 async fn remove_file_chunks_cleans_sqlite() {
642 let pool = setup_pool().await;
643
644 for i in 0..3 {
645 zeph_db::query(sql!(
646 "INSERT INTO chunk_metadata \
647 (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
648 VALUES (?, ?, ?, ?, ?, ?, ?)"
649 ))
650 .bind(format!("q{i}"))
651 .bind("src/lib.rs")
652 .bind(format!("hash{i}"))
653 .bind(1_i64)
654 .bind(10_i64)
655 .bind("rust")
656 .bind("function_item")
657 .execute(&pool)
658 .await
659 .unwrap();
660 }
661
662 let ids: Vec<(String,)> = zeph_db::query_as(sql!(
663 "SELECT qdrant_id FROM chunk_metadata WHERE file_path = ?"
664 ))
665 .bind("src/lib.rs")
666 .fetch_all(&pool)
667 .await
668 .unwrap();
669 assert_eq!(ids.len(), 3);
670
671 zeph_db::query(sql!("DELETE FROM chunk_metadata WHERE file_path = ?"))
672 .bind("src/lib.rs")
673 .execute(&pool)
674 .await
675 .unwrap();
676
677 let remaining: (i64,) = zeph_db::query_as(sql!(
678 "SELECT COUNT(*) FROM chunk_metadata WHERE file_path = ?"
679 ))
680 .bind("src/lib.rs")
681 .fetch_one(&pool)
682 .await
683 .unwrap();
684 assert_eq!(remaining.0, 0);
685 }
686
687 #[tokio::test]
688 async fn indexed_files_distinct() {
689 let pool = setup_pool().await;
690
691 for (i, path) in ["src/a.rs", "src/b.rs", "src/a.rs"].iter().enumerate() {
692 zeph_db::query(sql!(
693 "INSERT INTO chunk_metadata \
694 (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
695 VALUES (?, ?, ?, ?, ?, ?, ?) \
696 ON CONFLICT(qdrant_id) DO UPDATE SET \
697 file_path = excluded.file_path, content_hash = excluded.content_hash, \
698 line_start = excluded.line_start, line_end = excluded.line_end, \
699 language = excluded.language, node_type = excluded.node_type"
700 ))
701 .bind(format!("q{i}"))
702 .bind(path)
703 .bind(format!("hash{i}"))
704 .bind(1_i64)
705 .bind(10_i64)
706 .bind("rust")
707 .bind("function_item")
708 .execute(&pool)
709 .await
710 .unwrap();
711 }
712
713 let rows: Vec<(String,)> =
714 zeph_db::query_as(sql!("SELECT DISTINCT file_path FROM chunk_metadata"))
715 .fetch_all(&pool)
716 .await
717 .unwrap();
718 let files: Vec<String> = rows.into_iter().map(|(p,)| p).collect();
719 assert_eq!(files.len(), 2);
720 assert!(files.contains(&"src/a.rs".to_string()));
721 assert!(files.contains(&"src/b.rs".to_string()));
722 }
723
724 #[tokio::test]
728 async fn upsert_same_file_path_and_hash_is_idempotent() {
729 let pool = setup_pool().await;
730
731 for i in 0..2_u32 {
732 zeph_db::query(sql!(
733 "INSERT INTO chunk_metadata \
734 (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
735 VALUES (?, ?, ?, ?, ?, ?, ?) \
736 ON CONFLICT(file_path, content_hash) DO UPDATE SET \
737 qdrant_id = excluded.qdrant_id, \
738 line_start = excluded.line_start, line_end = excluded.line_end, \
739 language = excluded.language, node_type = excluded.node_type, \
740 entity_name = excluded.entity_name"
741 ))
742 .bind(format!("q{i}"))
743 .bind("src/lib.rs")
744 .bind("dedup_hash")
745 .bind(1_i64)
746 .bind(5_i64)
747 .bind("rust")
748 .bind("function_item")
749 .execute(&pool)
750 .await
751 .unwrap();
752 }
753
754 let count: (i64,) = zeph_db::query_as(sql!(
755 "SELECT COUNT(*) FROM chunk_metadata \
756 WHERE file_path = 'src/lib.rs' AND content_hash = 'dedup_hash'"
757 ))
758 .fetch_one(&pool)
759 .await
760 .unwrap();
761
762 assert_eq!(count.0, 1, "duplicate upsert must not produce a second row");
763
764 let qdrant_id: (String,) = zeph_db::query_as(sql!(
766 "SELECT qdrant_id FROM chunk_metadata \
767 WHERE file_path = 'src/lib.rs' AND content_hash = 'dedup_hash'"
768 ))
769 .fetch_one(&pool)
770 .await
771 .unwrap();
772 assert_eq!(
773 qdrant_id.0, "q1",
774 "qdrant_id must reflect the latest upsert"
775 );
776 }
777
778 #[tokio::test]
779 async fn existing_hashes_empty_input_returns_empty_set() {
780 let pool = setup_pool().await;
781 let ops = zeph_memory::QdrantOps::new("http://127.0.0.1:1", None).unwrap();
782 let store = CodeStore::with_ops(ops, pool);
783 let result = store.existing_hashes(&[]).await.unwrap();
784 assert!(result.is_empty());
785 }
786
787 #[tokio::test]
788 async fn existing_hashes_chunking_above_900() {
789 let pool = setup_pool().await;
790
791 for i in 0..901_usize {
793 zeph_db::query(sql!(
794 "INSERT INTO chunk_metadata \
795 (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
796 VALUES (?, ?, ?, ?, ?, ?, ?)"
797 ))
798 .bind(format!("q{i}"))
799 .bind("src/lib.rs")
800 .bind(format!("hash{i:04}"))
801 .bind(1_i64)
802 .bind(2_i64)
803 .bind("rust")
804 .bind("function_item")
805 .execute(&pool)
806 .await
807 .unwrap();
808 }
809
810 let all_hashes: Vec<String> = (0..901).map(|i| format!("hash{i:04}")).collect();
811 let refs: Vec<&str> = all_hashes.iter().map(String::as_str).collect();
812
813 let ops = zeph_memory::QdrantOps::new("http://127.0.0.1:1", None).unwrap();
814 let store = CodeStore::with_ops(ops, pool);
815 let result = store.existing_hashes(&refs).await.unwrap();
816
817 assert_eq!(result.len(), 901);
818 assert!(result.contains("hash0000"));
820 assert!(result.contains("hash0900"));
821 }
822}