Skip to main content

zeph_index/
store.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Qdrant collection + `SQLite` metadata for code chunks.
5//!
6//! [`CodeStore`] is a **dual-write store**: every chunk is simultaneously stored as
7//! a vector point in Qdrant (for similarity search) and as a metadata row in `SQLite`
8//! (for exact-hash deduplication and file-path bookkeeping).
9//!
10//! ## Why dual-write?
11//!
12//! Qdrant does not expose a cheap "does this hash exist?" query, so `SQLite` acts as a
13//! fast lookup table. Before embedding a file the indexer fetches all known hashes for
14//! that file from `SQLite` in a single `IN (…)` query; only chunks whose hash is absent
15//! are sent to the LLM for embedding.
16//!
17//! ## Collection name
18//!
19//! The Qdrant collection is always named `"zeph_code_chunks"`. The `SQLite` table is
20//! `chunk_metadata`, created by the `zeph-db` migration layer at startup.
21
22#[allow(unused_imports)]
23use zeph_db::sql;
24use zeph_memory::{FieldCondition, FieldValue, QdrantOps, VectorFilter, VectorPoint, VectorStore};
25
26use zeph_common::{EmbeddingVector, Normalized};
27
28use crate::error::Result;
29
30const CODE_COLLECTION: &str = "zeph_code_chunks";
31
32/// Qdrant + `SQLite` dual-write store for code chunks.
33///
34/// `CodeStore` is the persistence layer for the indexing pipeline. It is cheaply
35/// cloneable (all fields are reference-counted) and can safely be shared across async
36/// tasks.
37///
38/// # Lifecycle
39///
40/// 1. Call [`CodeStore::with_ops`] to construct.
41/// 2. Call [`CodeStore::ensure_collection`] once at startup to create the Qdrant
42///    collection if it does not yet exist.
43/// 3. Use [`CodeStore::upsert_chunks_batch`] during indexing and [`CodeStore::search`]
44///    during retrieval.
45#[derive(Clone)]
46pub struct CodeStore {
47    ops: QdrantOps,
48    collection: String,
49    pool: zeph_db::DbPool,
50}
51
52/// Borrowed parameters for inserting a single code chunk.
53///
54/// All string fields are borrowed to avoid cloning the source data during batch
55/// construction. The struct is consumed by [`CodeStore::upsert_chunk`] and
56/// [`CodeStore::upsert_chunks_batch`].
57pub struct ChunkInsert<'a> {
58    /// Relative path from the project root (e.g. `"src/lib.rs"`).
59    pub file_path: &'a str,
60    /// Language identifier (e.g. `"rust"`). See [`crate::languages::Lang::id`].
61    pub language: &'a str,
62    /// Tree-sitter node kind (e.g. `"function_item"`).
63    pub node_type: &'a str,
64    /// Optional symbol name extracted by the chunker.
65    pub entity_name: Option<&'a str>,
66    /// 1-based inclusive start line.
67    pub line_start: usize,
68    /// 1-based inclusive end line.
69    pub line_end: usize,
70    /// Raw source text of the chunk.
71    pub code: &'a str,
72    /// `">"` separated scope nesting path.
73    pub scope_chain: &'a str,
74    /// Blake3 hex digest of `code`.
75    pub content_hash: &'a str,
76}
77
78/// Tree-sitter node kind stored in Qdrant payload (e.g. `"function_item"`, `"struct_item"`).
79///
80/// A thin newtype over `String` that provides `Display`, `AsRef<str>`, `From<String>`,
81/// and `From<&str>` for ergonomic use in format strings and comparisons.
82///
83/// # Examples
84///
85/// ```
86/// use zeph_index::store::NodeKind;
87///
88/// let kind = NodeKind::from("function_item");
89/// assert_eq!(kind.as_ref(), "function_item");
90/// assert_eq!(kind.to_string(), "function_item");
91/// ```
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct NodeKind(pub String);
94
95impl std::fmt::Display for NodeKind {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.write_str(&self.0)
98    }
99}
100
101impl AsRef<str> for NodeKind {
102    fn as_ref(&self) -> &str {
103        &self.0
104    }
105}
106
107impl From<String> for NodeKind {
108    fn from(s: String) -> Self {
109        Self(s)
110    }
111}
112
113impl From<&str> for NodeKind {
114    fn from(s: &str) -> Self {
115        Self(s.to_owned())
116    }
117}
118
119/// A single search result returned by [`CodeStore::search`].
120///
121/// Decoded from the Qdrant vector point payload by `SearchHit::from_payload`.
122/// Points whose payload is missing required fields are silently dropped.
123#[derive(Debug)]
124pub struct SearchHit {
125    /// Raw source text of the matching chunk.
126    pub code: String,
127    /// Relative file path from the project root.
128    pub file_path: String,
129    /// 1-based inclusive `(start_line, end_line)` within the file.
130    pub line_range: (usize, usize),
131    /// Cosine similarity score returned by Qdrant (higher is more similar).
132    pub score: f32,
133    /// Tree-sitter node kind of the primary AST node.
134    pub node_type: NodeKind,
135    /// Programming language of the chunk.
136    pub language: crate::languages::Lang,
137    /// Symbol name, if available.
138    pub entity_name: Option<String>,
139    /// `">"` separated scope chain.
140    pub scope_chain: String,
141}
142
143impl CodeStore {
144    /// Create a `CodeStore` from a pre-built [`QdrantOps`] instance and a `SQLite` pool.
145    ///
146    /// The Qdrant collection is not created here — call [`CodeStore::ensure_collection`]
147    /// before performing any upserts.
148    ///
149    /// # Examples
150    ///
151    /// ```no_run
152    /// use zeph_index::store::CodeStore;
153    /// use zeph_memory::QdrantOps;
154    /// # async fn example() -> zeph_index::Result<()> {
155    /// # let pool: zeph_db::DbPool = panic!("placeholder");
156    ///
157    /// let ops = QdrantOps::new("http://localhost:6334", None).unwrap();
158    /// let store = CodeStore::with_ops(ops, pool);
159    /// store.ensure_collection(1536).await?;
160    /// # Ok(())
161    /// # }
162    /// ```
163    #[must_use]
164    pub fn with_ops(ops: QdrantOps, pool: zeph_db::DbPool) -> Self {
165        Self {
166            ops,
167            collection: CODE_COLLECTION.into(),
168            pool,
169        }
170    }
171
172    /// Create collection with INT8 scalar quantization if it doesn't exist.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if `Qdrant` operations fail.
177    pub async fn ensure_collection(&self, vector_size: u64) -> Result<()> {
178        self.ops
179            .ensure_collection_with_quantization(
180                &self.collection,
181                vector_size,
182                &["language", "file_path", "node_type"],
183            )
184            .await?;
185        Ok(())
186    }
187
188    /// Upsert a code chunk into both `Qdrant` and `SQLite`.
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if `Qdrant` or `SQLite` operations fail.
193    #[tracing::instrument(name = "index.store.upsert_chunk", skip_all)]
194    pub async fn upsert_chunk(&self, chunk: &ChunkInsert<'_>, vector: Vec<f32>) -> Result<String> {
195        tracing::Span::current().record("file_path", chunk.file_path);
196        let point_id = uuid::Uuid::new_v4().to_string();
197
198        let payload = serde_json::json!({
199            "file_path": chunk.file_path,
200            "language": chunk.language,
201            "node_type": chunk.node_type,
202            "entity_name": chunk.entity_name,
203            "line_start": chunk.line_start,
204            "line_end": chunk.line_end,
205            "code": chunk.code,
206            "scope_chain": chunk.scope_chain,
207            "content_hash": chunk.content_hash,
208        });
209
210        let payload_map = match payload {
211            serde_json::Value::Object(m) => m.into_iter().collect(),
212            _ => std::collections::HashMap::new(),
213        };
214
215        VectorStore::upsert(
216            &self.ops,
217            &self.collection,
218            vec![VectorPoint {
219                id: point_id.clone(),
220                vector,
221                payload: payload_map,
222            }],
223        )
224        .await?;
225
226        let line_start = i64::try_from(chunk.line_start)?;
227        let line_end = i64::try_from(chunk.line_end)?;
228
229        zeph_db::query(
230            sql!("INSERT INTO chunk_metadata \
231             (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type, entity_name) \
232             VALUES (?, ?, ?, ?, ?, ?, ?, ?) \
233             ON CONFLICT(file_path, content_hash) DO UPDATE SET \
234               qdrant_id = excluded.qdrant_id, \
235               line_start = excluded.line_start, line_end = excluded.line_end, \
236               language = excluded.language, node_type = excluded.node_type, \
237               entity_name = excluded.entity_name"),
238        )
239        .bind(&point_id)
240        .bind(chunk.file_path)
241        .bind(chunk.content_hash)
242        .bind(line_start)
243        .bind(line_end)
244        .bind(chunk.language)
245        .bind(chunk.node_type)
246        .bind(chunk.entity_name)
247        .execute(&self.pool)
248        .await?;
249
250        Ok(point_id)
251    }
252
253    /// Upsert multiple chunks into both `Qdrant` and `SQLite` in a single batch.
254    ///
255    /// All vector points are sent to `Qdrant` in one request and all metadata rows are inserted
256    /// in a single `SQLite` transaction, reducing per-chunk overhead during full-project indexing.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if `Qdrant` or `SQLite` operations fail.
261    #[tracing::instrument(name = "index.store.upsert_chunks_batch", skip_all)]
262    pub async fn upsert_chunks_batch(
263        &self,
264        chunks: Vec<(ChunkInsert<'_>, Vec<f32>)>,
265    ) -> Result<Vec<String>> {
266        tracing::Span::current().record("chunk_count", chunks.len());
267        if chunks.is_empty() {
268            return Ok(Vec::new());
269        }
270
271        let mut point_ids: Vec<String> = Vec::with_capacity(chunks.len());
272        let mut points: Vec<VectorPoint> = Vec::with_capacity(chunks.len());
273
274        for (chunk, vector) in &chunks {
275            let point_id = uuid::Uuid::new_v4().to_string();
276
277            let payload = serde_json::json!({
278                "file_path": chunk.file_path,
279                "language": chunk.language,
280                "node_type": chunk.node_type,
281                "entity_name": chunk.entity_name,
282                "line_start": chunk.line_start,
283                "line_end": chunk.line_end,
284                "code": chunk.code,
285                "scope_chain": chunk.scope_chain,
286                "content_hash": chunk.content_hash,
287            });
288
289            let payload_map = match payload {
290                serde_json::Value::Object(m) => m.into_iter().collect(),
291                _ => std::collections::HashMap::new(),
292            };
293
294            points.push(VectorPoint {
295                id: point_id.clone(),
296                vector: vector.clone(),
297                payload: payload_map,
298            });
299            point_ids.push(point_id);
300        }
301
302        VectorStore::upsert(&self.ops, &self.collection, points).await?;
303
304        let mut tx = self.pool.begin().await?;
305        for (idx, (chunk, _)) in chunks.iter().enumerate() {
306            let point_id = &point_ids[idx];
307            let line_start = i64::try_from(chunk.line_start)?;
308            let line_end = i64::try_from(chunk.line_end)?;
309
310            zeph_db::query(
311                sql!("INSERT INTO chunk_metadata \
312                 (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type, entity_name) \
313                 VALUES (?, ?, ?, ?, ?, ?, ?, ?) \
314                 ON CONFLICT(file_path, content_hash) DO UPDATE SET \
315                   qdrant_id = excluded.qdrant_id, \
316                   line_start = excluded.line_start, line_end = excluded.line_end, \
317                   language = excluded.language, node_type = excluded.node_type, \
318                   entity_name = excluded.entity_name"),
319            )
320            .bind(point_id)
321            .bind(chunk.file_path)
322            .bind(chunk.content_hash)
323            .bind(line_start)
324            .bind(line_end)
325            .bind(chunk.language)
326            .bind(chunk.node_type)
327            .bind(chunk.entity_name)
328            .execute(&mut *tx)
329            .await?;
330        }
331        tx.commit().await?;
332
333        Ok(point_ids)
334    }
335
336    /// Check if a chunk with this content hash already exists.
337    ///
338    /// # Errors
339    ///
340    /// Returns an error if the `SQLite` query fails.
341    #[tracing::instrument(name = "index.store.chunk_exists", skip_all, fields(%content_hash))]
342    pub async fn chunk_exists(&self, content_hash: &str) -> Result<bool> {
343        let row: (i64,) = zeph_db::query_as(sql!(
344            "SELECT COUNT(*) FROM chunk_metadata WHERE content_hash = ?"
345        ))
346        .bind(content_hash)
347        .fetch_one(&self.pool)
348        .await?;
349        Ok(row.0 > 0)
350    }
351
352    /// Return the set of content hashes that already exist in the store.
353    ///
354    /// Uses `WHERE content_hash IN (...)` with chunks of 900 to stay below
355    /// `SQLite`'s default variable limit of 999.
356    ///
357    /// # Errors
358    ///
359    /// Returns an error if the `SQLite` query fails.
360    #[tracing::instrument(name = "index.store.existing_hashes", skip_all)]
361    pub async fn existing_hashes(
362        &self,
363        hashes: &[&str],
364    ) -> Result<std::collections::HashSet<String>> {
365        tracing::Span::current().record("hash_count", hashes.len());
366        if hashes.is_empty() {
367            return Ok(std::collections::HashSet::new());
368        }
369
370        let mut result = std::collections::HashSet::new();
371
372        for chunk in hashes.chunks(900) {
373            let placeholders = std::iter::repeat_n("?", chunk.len())
374                .collect::<Vec<_>>()
375                .join(", ");
376            let sql = format!(
377                "SELECT content_hash FROM chunk_metadata WHERE content_hash IN ({placeholders})"
378            );
379            let mut query = zeph_db::query_scalar::<_, String>(&sql);
380            for hash in chunk {
381                query = query.bind(*hash);
382            }
383            let rows: Vec<String> = query.fetch_all(&self.pool).await?;
384            result.extend(rows);
385        }
386
387        Ok(result)
388    }
389
390    /// Remove all chunks for a given file path from both stores.
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if `Qdrant` or `SQLite` operations fail.
395    #[tracing::instrument(name = "index.store.remove_file_chunks", skip_all)]
396    pub async fn remove_file_chunks(&self, file_path: &str) -> Result<usize> {
397        tracing::Span::current().record("file_path", file_path);
398        let ids: Vec<(String,)> = zeph_db::query_as(sql!(
399            "SELECT qdrant_id FROM chunk_metadata WHERE file_path = ?"
400        ))
401        .bind(file_path)
402        .fetch_all(&self.pool)
403        .await?;
404
405        if ids.is_empty() {
406            return Ok(0);
407        }
408
409        let point_ids: Vec<String> = ids.iter().map(|(id,)| id.clone()).collect();
410
411        VectorStore::delete_by_ids(&self.ops, &self.collection, point_ids).await?;
412
413        let count = ids.len();
414        zeph_db::query(sql!("DELETE FROM chunk_metadata WHERE file_path = ?"))
415            .bind(file_path)
416            .execute(&self.pool)
417            .await?;
418
419        Ok(count)
420    }
421
422    /// Search for similar code chunks.
423    ///
424    /// The `query_vector` must be L2-normalized (use
425    /// [`EmbeddingVector::<Unnormalized>::normalize`](zeph_common::EmbeddingVector::normalize)
426    /// or
427    /// [`EmbeddingVector::<Normalized>::new_normalized`](zeph_common::EmbeddingVector::new_normalized)
428    /// before calling). Requiring [`Normalized`] at the type level prevents silent
429    /// near-zero cosine scores that Qdrant gRPC returns for mismatched or
430    /// unnormalized vectors.
431    ///
432    /// # Errors
433    ///
434    /// Returns an error if `Qdrant` search fails.
435    #[tracing::instrument(name = "index.store.search", skip_all)]
436    pub async fn search(
437        &self,
438        query_vector: EmbeddingVector<Normalized>,
439        limit: usize,
440        language_filter: Option<String>,
441    ) -> Result<Vec<SearchHit>> {
442        let limit_u64 = u64::try_from(limit)?;
443        let filter = language_filter.map(|lang| VectorFilter {
444            must: vec![FieldCondition {
445                field: "language".into(),
446                value: FieldValue::Text(lang),
447            }],
448            must_not: vec![],
449        });
450
451        let results = VectorStore::search(
452            &self.ops,
453            &self.collection,
454            query_vector.into_inner(),
455            limit_u64,
456            filter,
457        )
458        .await?;
459
460        Ok(results
461            .into_iter()
462            .filter_map(|p| SearchHit::from_payload(&p))
463            .collect())
464    }
465
466    /// List all indexed file paths.
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if the `SQLite` query fails.
471    #[tracing::instrument(name = "index.store.indexed_files", skip_all)]
472    pub async fn indexed_files(&self) -> Result<Vec<String>> {
473        let rows: Vec<(String,)> =
474            zeph_db::query_as(sql!("SELECT DISTINCT file_path FROM chunk_metadata"))
475                .fetch_all(&self.pool)
476                .await?;
477        Ok(rows.into_iter().map(|(p,)| p).collect())
478    }
479}
480
481impl SearchHit {
482    fn from_payload(point: &zeph_memory::ScoredVectorPoint) -> Option<Self> {
483        let get_str = |key: &str| -> Option<String> {
484            point
485                .payload
486                .get(key)
487                .and_then(serde_json::Value::as_str)
488                .map(ToOwned::to_owned)
489        };
490        let get_usize = |key: &str| -> Option<usize> {
491            point
492                .payload
493                .get(key)
494                .and_then(serde_json::Value::as_i64)
495                .and_then(|v| usize::try_from(v).ok())
496        };
497
498        let language_str = get_str("language")?;
499        let language = crate::languages::Lang::from_id(&language_str)?;
500        Some(Self {
501            code: get_str("code")?,
502            file_path: get_str("file_path")?,
503            line_range: (get_usize("line_start")?, get_usize("line_end")?),
504            score: point.score,
505            node_type: NodeKind::from(get_str("node_type")?),
506            language,
507            entity_name: get_str("entity_name"),
508            scope_chain: get_str("scope_chain").unwrap_or_default(),
509        })
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516    use zeph_memory::ScoredVectorPoint;
517
518    fn make_scored_point(payload: serde_json::Value, score: f32) -> ScoredVectorPoint {
519        let map = match payload {
520            serde_json::Value::Object(m) => m.into_iter().collect(),
521            _ => std::collections::HashMap::new(),
522        };
523        ScoredVectorPoint {
524            id: "test-id".to_string(),
525            score,
526            payload: map,
527        }
528    }
529
530    #[test]
531    fn search_hit_from_payload_full() {
532        let point = make_scored_point(
533            serde_json::json!({
534                "code": "fn foo() {}",
535                "file_path": "src/lib.rs",
536                "line_start": 10,
537                "line_end": 12,
538                "language": "rust",
539                "node_type": "function_item",
540                "entity_name": "foo",
541                "scope_chain": "mod::foo"
542            }),
543            0.9,
544        );
545        let hit = SearchHit::from_payload(&point).unwrap();
546        assert_eq!(hit.code, "fn foo() {}");
547        assert_eq!(hit.file_path, "src/lib.rs");
548        assert_eq!(hit.line_range, (10, 12));
549        assert!((hit.score - 0.9).abs() < f32::EPSILON);
550        assert_eq!(hit.node_type.as_ref(), "function_item");
551        assert_eq!(hit.language, crate::languages::Lang::Rust);
552        assert_eq!(hit.entity_name, Some("foo".to_string()));
553        assert_eq!(hit.scope_chain, "mod::foo");
554    }
555
556    #[test]
557    fn search_hit_from_payload_no_entity_name() {
558        let point = make_scored_point(
559            serde_json::json!({
560                "code": "struct Bar {}",
561                "file_path": "src/bar.rs",
562                "line_start": 1,
563                "line_end": 3,
564                "language": "rust",
565                "node_type": "struct_item",
566                "scope_chain": ""
567            }),
568            0.7,
569        );
570        let hit = SearchHit::from_payload(&point).unwrap();
571        assert!(hit.entity_name.is_none());
572        assert_eq!(hit.node_type.as_ref(), "struct_item");
573    }
574
575    #[test]
576    fn search_hit_from_payload_missing_required_field_returns_none() {
577        // Missing "code" field — should return None
578        let point = make_scored_point(
579            serde_json::json!({
580                "file_path": "src/lib.rs",
581                "line_start": 1,
582                "line_end": 2,
583                "language": "rust",
584                "node_type": "function_item"
585            }),
586            0.5,
587        );
588        assert!(SearchHit::from_payload(&point).is_none());
589    }
590
591    async fn setup_pool() -> zeph_db::DbPool {
592        zeph_db::DbConfig {
593            url: ":memory:".to_string(),
594            ..Default::default()
595        }
596        .connect()
597        .await
598        .unwrap()
599    }
600
601    #[tokio::test]
602    async fn chunk_exists_returns_false_then_true() {
603        let pool = setup_pool().await;
604
605        let exists = zeph_db::query_as::<_, (i64,)>(sql!(
606            "SELECT COUNT(*) FROM chunk_metadata WHERE content_hash = ?"
607        ))
608        .bind("abc123")
609        .fetch_one(&pool)
610        .await
611        .unwrap();
612        assert_eq!(exists.0, 0);
613
614        zeph_db::query(sql!(
615            "INSERT INTO chunk_metadata \
616             (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
617             VALUES (?, ?, ?, ?, ?, ?, ?)"
618        ))
619        .bind("q1")
620        .bind("src/main.rs")
621        .bind("abc123")
622        .bind(1_i64)
623        .bind(10_i64)
624        .bind("rust")
625        .bind("function_item")
626        .execute(&pool)
627        .await
628        .unwrap();
629
630        let exists = zeph_db::query_as::<_, (i64,)>(sql!(
631            "SELECT COUNT(*) FROM chunk_metadata WHERE content_hash = ?"
632        ))
633        .bind("abc123")
634        .fetch_one(&pool)
635        .await
636        .unwrap();
637        assert!(exists.0 > 0);
638    }
639
640    #[tokio::test]
641    async fn remove_file_chunks_cleans_sqlite() {
642        let pool = setup_pool().await;
643
644        for i in 0..3 {
645            zeph_db::query(sql!(
646                "INSERT INTO chunk_metadata \
647                 (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
648                 VALUES (?, ?, ?, ?, ?, ?, ?)"
649            ))
650            .bind(format!("q{i}"))
651            .bind("src/lib.rs")
652            .bind(format!("hash{i}"))
653            .bind(1_i64)
654            .bind(10_i64)
655            .bind("rust")
656            .bind("function_item")
657            .execute(&pool)
658            .await
659            .unwrap();
660        }
661
662        let ids: Vec<(String,)> = zeph_db::query_as(sql!(
663            "SELECT qdrant_id FROM chunk_metadata WHERE file_path = ?"
664        ))
665        .bind("src/lib.rs")
666        .fetch_all(&pool)
667        .await
668        .unwrap();
669        assert_eq!(ids.len(), 3);
670
671        zeph_db::query(sql!("DELETE FROM chunk_metadata WHERE file_path = ?"))
672            .bind("src/lib.rs")
673            .execute(&pool)
674            .await
675            .unwrap();
676
677        let remaining: (i64,) = zeph_db::query_as(sql!(
678            "SELECT COUNT(*) FROM chunk_metadata WHERE file_path = ?"
679        ))
680        .bind("src/lib.rs")
681        .fetch_one(&pool)
682        .await
683        .unwrap();
684        assert_eq!(remaining.0, 0);
685    }
686
687    #[tokio::test]
688    async fn indexed_files_distinct() {
689        let pool = setup_pool().await;
690
691        for (i, path) in ["src/a.rs", "src/b.rs", "src/a.rs"].iter().enumerate() {
692            zeph_db::query(sql!(
693                "INSERT INTO chunk_metadata \
694                 (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
695                 VALUES (?, ?, ?, ?, ?, ?, ?) \
696                 ON CONFLICT(qdrant_id) DO UPDATE SET \
697                   file_path = excluded.file_path, content_hash = excluded.content_hash, \
698                   line_start = excluded.line_start, line_end = excluded.line_end, \
699                   language = excluded.language, node_type = excluded.node_type"
700            ))
701            .bind(format!("q{i}"))
702            .bind(path)
703            .bind(format!("hash{i}"))
704            .bind(1_i64)
705            .bind(10_i64)
706            .bind("rust")
707            .bind("function_item")
708            .execute(&pool)
709            .await
710            .unwrap();
711        }
712
713        let rows: Vec<(String,)> =
714            zeph_db::query_as(sql!("SELECT DISTINCT file_path FROM chunk_metadata"))
715                .fetch_all(&pool)
716                .await
717                .unwrap();
718        let files: Vec<String> = rows.into_iter().map(|(p,)| p).collect();
719        assert_eq!(files.len(), 2);
720        assert!(files.contains(&"src/a.rs".to_string()));
721        assert!(files.contains(&"src/b.rs".to_string()));
722    }
723
724    /// Verifies that inserting the same (`file_path`, `content_hash`) twice does not
725    /// produce a duplicate row — the `ON CONFLICT(file_path, content_hash)` clause
726    /// must perform an UPDATE, not a second INSERT.
727    #[tokio::test]
728    async fn upsert_same_file_path_and_hash_is_idempotent() {
729        let pool = setup_pool().await;
730
731        for i in 0..2_u32 {
732            zeph_db::query(sql!(
733                "INSERT INTO chunk_metadata \
734                 (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
735                 VALUES (?, ?, ?, ?, ?, ?, ?) \
736                 ON CONFLICT(file_path, content_hash) DO UPDATE SET \
737                   qdrant_id = excluded.qdrant_id, \
738                   line_start = excluded.line_start, line_end = excluded.line_end, \
739                   language = excluded.language, node_type = excluded.node_type, \
740                   entity_name = excluded.entity_name"
741            ))
742            .bind(format!("q{i}"))
743            .bind("src/lib.rs")
744            .bind("dedup_hash")
745            .bind(1_i64)
746            .bind(5_i64)
747            .bind("rust")
748            .bind("function_item")
749            .execute(&pool)
750            .await
751            .unwrap();
752        }
753
754        let count: (i64,) = zeph_db::query_as(sql!(
755            "SELECT COUNT(*) FROM chunk_metadata \
756             WHERE file_path = 'src/lib.rs' AND content_hash = 'dedup_hash'"
757        ))
758        .fetch_one(&pool)
759        .await
760        .unwrap();
761
762        assert_eq!(count.0, 1, "duplicate upsert must not produce a second row");
763
764        // The second upsert must have updated qdrant_id to the latest value.
765        let qdrant_id: (String,) = zeph_db::query_as(sql!(
766            "SELECT qdrant_id FROM chunk_metadata \
767             WHERE file_path = 'src/lib.rs' AND content_hash = 'dedup_hash'"
768        ))
769        .fetch_one(&pool)
770        .await
771        .unwrap();
772        assert_eq!(
773            qdrant_id.0, "q1",
774            "qdrant_id must reflect the latest upsert"
775        );
776    }
777
778    #[tokio::test]
779    async fn existing_hashes_empty_input_returns_empty_set() {
780        let pool = setup_pool().await;
781        let ops = zeph_memory::QdrantOps::new("http://127.0.0.1:1", None).unwrap();
782        let store = CodeStore::with_ops(ops, pool);
783        let result = store.existing_hashes(&[]).await.unwrap();
784        assert!(result.is_empty());
785    }
786
787    #[tokio::test]
788    async fn existing_hashes_chunking_above_900() {
789        let pool = setup_pool().await;
790
791        // Insert 901 rows.
792        for i in 0..901_usize {
793            zeph_db::query(sql!(
794                "INSERT INTO chunk_metadata \
795                 (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
796                 VALUES (?, ?, ?, ?, ?, ?, ?)"
797            ))
798            .bind(format!("q{i}"))
799            .bind("src/lib.rs")
800            .bind(format!("hash{i:04}"))
801            .bind(1_i64)
802            .bind(2_i64)
803            .bind("rust")
804            .bind("function_item")
805            .execute(&pool)
806            .await
807            .unwrap();
808        }
809
810        let all_hashes: Vec<String> = (0..901).map(|i| format!("hash{i:04}")).collect();
811        let refs: Vec<&str> = all_hashes.iter().map(String::as_str).collect();
812
813        let ops = zeph_memory::QdrantOps::new("http://127.0.0.1:1", None).unwrap();
814        let store = CodeStore::with_ops(ops, pool);
815        let result = store.existing_hashes(&refs).await.unwrap();
816
817        assert_eq!(result.len(), 901);
818        // Spot-check a few entries.
819        assert!(result.contains("hash0000"));
820        assert!(result.contains("hash0900"));
821    }
822}