Skip to main content

zeph_memory/graph/
resolver.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::sync::Arc;
5
6use dashmap::DashMap;
7use futures::stream::{self, StreamExt as _};
8use schemars::JsonSchema;
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use zeph_llm::any::AnyProvider;
12use zeph_llm::provider::{LlmProvider as _, Message, Role};
13
14use super::store::GraphStore;
15use super::types::EntityType;
16use crate::embedding_store::EmbeddingStore;
17use crate::error::MemoryError;
18use crate::graph::extractor::ExtractedEntity;
19use crate::types::MessageId;
20use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
21
22/// Minimum byte length for entity names — rejects noise tokens like "go", "cd".
23const MIN_ENTITY_NAME_BYTES: usize = 3;
24/// Maximum byte length for entity names stored in the graph.
25const MAX_ENTITY_NAME_BYTES: usize = 512;
26/// Maximum byte length for relation strings.
27const MAX_RELATION_BYTES: usize = 256;
28/// Maximum byte length for fact strings.
29const MAX_FACT_BYTES: usize = 2048;
30
31/// Qdrant collection for entity embeddings.
32const ENTITY_COLLECTION: &str = "zeph_graph_entities";
33
34/// Timeout for a single `embed()` call in seconds.
35const EMBED_TIMEOUT_SECS: u64 = 30;
36
37/// Strip ASCII control characters and Unicode `BiDi` override codepoints.
38fn strip_control_chars(s: &str) -> String {
39    s.chars()
40        .filter(|c| !c.is_control() && !matches!(*c as u32, 0x202A..=0x202E | 0x2066..=0x2069))
41        .collect()
42}
43
44/// Truncate a string to at most `max_bytes` bytes at a valid UTF-8 char boundary.
45fn truncate_to_bytes(s: &str, max_bytes: usize) -> &str {
46    if s.len() <= max_bytes {
47        return s;
48    }
49    let mut boundary = max_bytes;
50    while !s.is_char_boundary(boundary) {
51        boundary -= 1;
52    }
53    &s[..boundary]
54}
55
56/// Outcome of an entity resolution attempt.
57#[derive(Debug, Clone, PartialEq)]
58pub enum ResolutionOutcome {
59    /// Exact name+type match in `SQLite`.
60    ExactMatch,
61    /// Cosine similarity >= merge threshold; score is the cosine similarity value.
62    EmbeddingMatch { score: f32 },
63    /// LLM confirmed merge in ambiguous similarity range.
64    LlmDisambiguated,
65    /// New entity was created.
66    Created,
67}
68
69/// LLM response for entity disambiguation.
70#[derive(Debug, Deserialize, JsonSchema)]
71struct DisambiguationResponse {
72    same_entity: bool,
73}
74
75/// Per-entity-name lock guard to prevent concurrent duplicate creation.
76///
77/// Keyed by normalized entity name. Entities with different names resolve concurrently;
78/// entities with the same name are serialized.
79///
80/// TODO(SEC-M33-02): This map grows unboundedly — one entry per unique normalized name.
81/// For a short-lived resolver this is acceptable. If the resolver becomes long-lived
82/// (stored in `SemanticMemory`), add eviction or use a fixed-size sharded lock array.
83type NameLockMap = Arc<DashMap<String, Arc<Mutex<()>>>>;
84
85pub struct EntityResolver<'a> {
86    store: &'a GraphStore,
87    embedding_store: Option<&'a Arc<EmbeddingStore>>,
88    provider: Option<&'a AnyProvider>,
89    similarity_threshold: f32,
90    ambiguous_threshold: f32,
91    name_locks: NameLockMap,
92    /// Counter for error-triggered fallbacks (embed/LLM failures). Tests can read this via Arc.
93    fallback_count: Arc<std::sync::atomic::AtomicU64>,
94}
95
96impl<'a> EntityResolver<'a> {
97    #[must_use]
98    pub fn new(store: &'a GraphStore) -> Self {
99        Self {
100            store,
101            embedding_store: None,
102            provider: None,
103            similarity_threshold: 0.85,
104            ambiguous_threshold: 0.70,
105            name_locks: Arc::new(DashMap::new()),
106            fallback_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
107        }
108    }
109
110    #[must_use]
111    pub fn with_embedding_store(mut self, store: &'a Arc<EmbeddingStore>) -> Self {
112        self.embedding_store = Some(store);
113        self
114    }
115
116    #[must_use]
117    pub fn with_provider(mut self, provider: &'a AnyProvider) -> Self {
118        self.provider = Some(provider);
119        self
120    }
121
122    #[must_use]
123    pub fn with_thresholds(mut self, similarity: f32, ambiguous: f32) -> Self {
124        self.similarity_threshold = similarity;
125        self.ambiguous_threshold = ambiguous;
126        self
127    }
128
129    /// Shared fallback counter — tests can clone this Arc to inspect the value.
130    #[must_use]
131    pub fn fallback_count(&self) -> Arc<std::sync::atomic::AtomicU64> {
132        Arc::clone(&self.fallback_count)
133    }
134
135    /// Normalize an entity name: trim, lowercase, strip control chars, truncate.
136    fn normalize_name(name: &str) -> String {
137        let lowered = name.trim().to_lowercase();
138        let cleaned = strip_control_chars(&lowered);
139        let normalized = truncate_to_bytes(&cleaned, MAX_ENTITY_NAME_BYTES).to_owned();
140        if normalized.len() < cleaned.len() {
141            tracing::debug!(
142                "graph resolver: entity name truncated to {} bytes",
143                MAX_ENTITY_NAME_BYTES
144            );
145        }
146        normalized
147    }
148
149    /// Parse an entity type string, falling back to `Concept` on unknown values.
150    fn parse_entity_type(entity_type: &str) -> EntityType {
151        entity_type
152            .trim()
153            .to_lowercase()
154            .parse::<EntityType>()
155            .unwrap_or_else(|_| {
156                tracing::debug!(
157                    "graph resolver: unknown entity type {:?}, falling back to Concept",
158                    entity_type
159                );
160                EntityType::Concept
161            })
162    }
163
164    /// Acquire the per-name lock and return the guard. Keeps lock alive for the caller.
165    async fn lock_name(&self, normalized: &str) -> tokio::sync::OwnedMutexGuard<()> {
166        let lock = self
167            .name_locks
168            .entry(normalized.to_owned())
169            .or_insert_with(|| Arc::new(Mutex::new(())))
170            .clone();
171        lock.lock_owned().await
172    }
173
174    /// Resolve an extracted entity using the alias-first canonicalization pipeline.
175    ///
176    /// Pipeline:
177    /// 1. Normalize: trim, lowercase, strip control chars, truncate to 512 bytes.
178    /// 2. Parse entity type (fallback to Concept on unknown).
179    /// 3. Alias lookup: search `graph_entity_aliases` by normalized name + `entity_type`.
180    ///    If found, touch `last_seen_at` and return the existing entity id.
181    /// 4. Canonical name lookup: search `graph_entities` by `canonical_name` + `entity_type`.
182    ///    If found, touch `last_seen_at` and return the existing entity id.
183    /// 5. When `embedding_store` and `provider` are configured, performs embedding-based fuzzy
184    ///    matching: cosine similarity search (Qdrant), LLM disambiguation for ambiguous range,
185    ///    merge or create based on result. Failures degrade gracefully to step 6.
186    /// 6. Create: upsert new entity with `canonical_name` = normalized name.
187    /// 7. Register the normalized form (and original trimmed form if different) as aliases.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if the entity name is empty after normalization, or if a DB operation fails.
192    pub async fn resolve(
193        &self,
194        name: &str,
195        entity_type: &str,
196        summary: Option<&str>,
197    ) -> Result<(i64, ResolutionOutcome), MemoryError> {
198        let normalized = Self::normalize_name(name);
199
200        if normalized.is_empty() {
201            return Err(MemoryError::GraphStore("empty entity name".into()));
202        }
203
204        if normalized.len() < MIN_ENTITY_NAME_BYTES {
205            return Err(MemoryError::GraphStore(format!(
206                "entity name too short: {normalized:?} ({} bytes, min {MIN_ENTITY_NAME_BYTES})",
207                normalized.len()
208            )));
209        }
210
211        let et = Self::parse_entity_type(entity_type);
212
213        // The surface form preserves the original casing for user-facing display.
214        let surface_name = name.trim().to_owned();
215
216        // Acquire per-name lock to prevent concurrent duplicate creation.
217        let _guard = self.lock_name(&normalized).await;
218
219        // Step 3: alias-first lookup (filters by entity_type to prevent cross-type collisions).
220        if let Some(entity) = self.store.find_entity_by_alias(&normalized, et).await? {
221            self.store
222                .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
223                .await?;
224            return Ok((entity.id, ResolutionOutcome::ExactMatch));
225        }
226
227        // Step 4: canonical name lookup.
228        if let Some(entity) = self.store.find_entity(&normalized, et).await? {
229            self.store
230                .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
231                .await?;
232            return Ok((entity.id, ResolutionOutcome::ExactMatch));
233        }
234
235        // Step 5: Embedding-based resolution (when configured).
236        if let Some(outcome) = self
237            .resolve_via_embedding(&normalized, name, &surface_name, et, summary)
238            .await?
239        {
240            return Ok(outcome);
241        }
242
243        // Step 6: Create new entity (no embedding store, or embedding failure).
244        let entity_id = self
245            .store
246            .upsert_entity(&surface_name, &normalized, et, summary)
247            .await?;
248
249        self.register_aliases(entity_id, &normalized, name).await?;
250
251        Ok((entity_id, ResolutionOutcome::Created))
252    }
253
254    /// Compute embedding for an entity, incrementing `fallback_count` on failure/timeout.
255    /// Returns `None` when embedding is unavailable (caller should skip vector operations).
256    async fn embed_entity_text(
257        &self,
258        provider: &AnyProvider,
259        normalized: &str,
260        summary: Option<&str>,
261    ) -> Option<Vec<f32>> {
262        let safe_summary = truncate_to_bytes(summary.unwrap_or(""), MAX_FACT_BYTES);
263        let embed_text = format!("{normalized}: {safe_summary}");
264        let embed_result = tokio::time::timeout(
265            std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
266            provider.embed(&embed_text),
267        )
268        .await;
269        match embed_result {
270            Ok(Ok(v)) => Some(v),
271            Ok(Err(err)) => {
272                self.fallback_count
273                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
274                tracing::warn!(entity_name = %normalized, error = %err,
275                    "embed() failed; falling back to exact-match-only entity creation");
276                None
277            }
278            Err(_timeout) => {
279                self.fallback_count
280                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
281                tracing::warn!(entity_name = %normalized,
282                    "embed() timed out after {}s; falling back to create new entity",
283                    EMBED_TIMEOUT_SECS);
284                None
285            }
286        }
287    }
288
289    /// Handle a candidate in the ambiguous score range by running LLM disambiguation.
290    /// Returns `Ok(Some(...))` if the LLM confirms a match, `Ok(None)` to fall through to create.
291    #[allow(clippy::too_many_arguments)]
292    async fn handle_ambiguous_candidate(
293        &self,
294        emb_store: &EmbeddingStore,
295        provider: &AnyProvider,
296        payload: &std::collections::HashMap<String, serde_json::Value>,
297        score: f32,
298        surface_name: &str,
299        normalized: &str,
300        et: EntityType,
301        summary: Option<&str>,
302    ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
303        let entity_id = payload
304            .get("entity_id")
305            .and_then(serde_json::Value::as_i64)
306            .ok_or_else(|| MemoryError::GraphStore("missing entity_id in payload".into()))?;
307        let existing_name = payload
308            .get("name")
309            .and_then(|v| v.as_str())
310            .unwrap_or("")
311            .to_owned();
312        let existing_summary = payload
313            .get("summary")
314            .and_then(|v| v.as_str())
315            .unwrap_or("")
316            .to_owned();
317        // Use the existing entity's actual type from the payload (IC-S3)
318        let existing_type = payload
319            .get("entity_type")
320            .and_then(|v| v.as_str())
321            .unwrap_or(et.as_str())
322            .to_owned();
323        match self
324            .llm_disambiguate(
325                provider,
326                normalized,
327                et.as_str(),
328                summary.unwrap_or(""),
329                &existing_name,
330                &existing_type,
331                &existing_summary,
332                score,
333            )
334            .await
335        {
336            Some(true) => {
337                self.merge_entity(
338                    emb_store,
339                    provider,
340                    entity_id,
341                    surface_name,
342                    normalized,
343                    et,
344                    summary,
345                )
346                .await?;
347                Ok(Some((entity_id, ResolutionOutcome::LlmDisambiguated)))
348            }
349            Some(false) => Ok(None),
350            None => {
351                self.fallback_count
352                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
353                tracing::warn!(entity_name = %normalized,
354                    "LLM disambiguation failed; falling back to create new entity");
355                Ok(None)
356            }
357        }
358    }
359
360    /// Attempt embedding-based resolution. Returns `Ok(Some(...))` if resolved (early return),
361    /// `Ok(None)` if no match found (caller should fall through to create), or `Err` on DB error.
362    async fn resolve_via_embedding(
363        &self,
364        normalized: &str,
365        original_name: &str,
366        surface_name: &str,
367        et: EntityType,
368        summary: Option<&str>,
369    ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
370        let (Some(emb_store), Some(provider)) = (self.embedding_store, self.provider) else {
371            return Ok(None);
372        };
373
374        let Some(query_vec) = self.embed_entity_text(provider, normalized, summary).await else {
375            return Ok(None);
376        };
377
378        let type_filter = VectorFilter {
379            must: vec![FieldCondition {
380                field: "entity_type".into(),
381                value: FieldValue::Text(et.as_str().to_owned()),
382            }],
383            must_not: vec![],
384        };
385        let candidates = match emb_store
386            .search_collection(ENTITY_COLLECTION, &query_vec, 5, Some(type_filter))
387            .await
388        {
389            Ok(c) => c,
390            Err(err) => {
391                self.fallback_count
392                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
393                tracing::warn!(entity_name = %normalized, error = %err,
394                    "Qdrant search failed; falling back to create new entity");
395                return self
396                    .create_with_embedding(
397                        emb_store,
398                        surface_name,
399                        normalized,
400                        original_name,
401                        et,
402                        summary,
403                        &query_vec,
404                    )
405                    .await
406                    .map(Some);
407            }
408        };
409
410        if let Some(best) = candidates.first() {
411            let score = best.score;
412            if score >= self.similarity_threshold {
413                let entity_id = best
414                    .payload
415                    .get("entity_id")
416                    .and_then(serde_json::Value::as_i64)
417                    .ok_or_else(|| {
418                        MemoryError::GraphStore("missing entity_id in payload".into())
419                    })?;
420                self.merge_entity(
421                    emb_store,
422                    provider,
423                    entity_id,
424                    surface_name,
425                    normalized,
426                    et,
427                    summary,
428                )
429                .await?;
430                return Ok(Some((
431                    entity_id,
432                    ResolutionOutcome::EmbeddingMatch { score },
433                )));
434            } else if score >= self.ambiguous_threshold
435                && let Some(result) = self
436                    .handle_ambiguous_candidate(
437                        emb_store,
438                        provider,
439                        &best.payload,
440                        score,
441                        surface_name,
442                        normalized,
443                        et,
444                        summary,
445                    )
446                    .await?
447            {
448                return Ok(Some(result));
449            }
450            // score < ambiguous_threshold or LLM said different: fall through to create with embedding
451        }
452
453        // No suitable match — create new entity and store embedding.
454        self.create_with_embedding(
455            emb_store,
456            surface_name,
457            normalized,
458            original_name,
459            et,
460            summary,
461            &query_vec,
462        )
463        .await
464        .map(Some)
465    }
466
467    /// Create a new entity, register aliases, and store its embedding in Qdrant.
468    #[allow(clippy::too_many_arguments)]
469    async fn create_with_embedding(
470        &self,
471        emb_store: &EmbeddingStore,
472        surface_name: &str,
473        normalized: &str,
474        original_name: &str,
475        et: EntityType,
476        summary: Option<&str>,
477        query_vec: &[f32],
478    ) -> Result<(i64, ResolutionOutcome), MemoryError> {
479        let entity_id = self
480            .store
481            .upsert_entity(surface_name, normalized, et, summary)
482            .await?;
483        self.register_aliases(entity_id, normalized, original_name)
484            .await?;
485        self.store_entity_embedding(
486            emb_store,
487            entity_id,
488            None,
489            normalized,
490            et,
491            summary.unwrap_or(""),
492            query_vec,
493        )
494        .await;
495        Ok((entity_id, ResolutionOutcome::Created))
496    }
497
498    /// Register the normalized form and original trimmed form as aliases for an entity.
499    async fn register_aliases(
500        &self,
501        entity_id: i64,
502        normalized: &str,
503        original_name: &str,
504    ) -> Result<(), MemoryError> {
505        self.store.add_alias(entity_id, normalized).await?;
506
507        // Also register the original trimmed lowercased form if it differs from normalized
508        // (e.g. when control chars were stripped, leaving a shorter string).
509        let original_trimmed = original_name.trim().to_lowercase();
510        let original_clean_str = strip_control_chars(&original_trimmed);
511        let original_clean = truncate_to_bytes(&original_clean_str, MAX_ENTITY_NAME_BYTES);
512        if original_clean != normalized {
513            self.store.add_alias(entity_id, original_clean).await?;
514        }
515
516        Ok(())
517    }
518
519    /// Merge an existing entity with new information: combine summaries, update Qdrant.
520    #[allow(clippy::too_many_arguments)]
521    async fn merge_entity(
522        &self,
523        emb_store: &EmbeddingStore,
524        provider: &AnyProvider,
525        entity_id: i64,
526        new_surface_name: &str,
527        new_canonical_name: &str,
528        entity_type: EntityType,
529        new_summary: Option<&str>,
530    ) -> Result<(), MemoryError> {
531        // TODO(PERF-03): The Qdrant payload already contains name/summary at the call site;
532        // pass them in as parameters to eliminate this extra SQLite roundtrip per merge.
533        let existing = self.store.find_entity_by_id(entity_id).await?;
534        let existing_summary = existing
535            .as_ref()
536            .and_then(|e| e.summary.as_deref())
537            .unwrap_or("");
538
539        let merged_summary = if let Some(new) = new_summary {
540            if !new.is_empty() && !existing_summary.is_empty() {
541                let combined = format!("{existing_summary}; {new}");
542                // TODO(S2): use LLM-based summary merge when summary exceeds 512 bytes
543                truncate_to_bytes(&combined, MAX_FACT_BYTES).to_owned()
544            } else if !new.is_empty() {
545                new.to_owned()
546            } else {
547                existing_summary.to_owned()
548            }
549        } else {
550            existing_summary.to_owned()
551        };
552
553        let summary_opt = if merged_summary.is_empty() {
554            None
555        } else {
556            Some(merged_summary.as_str())
557        };
558
559        // Update the EXISTING entity's summary (keep its canonical_name, update surface display name).
560        let existing_canonical = existing.as_ref().map_or_else(
561            || new_canonical_name.to_owned(),
562            |e| e.canonical_name.clone(),
563        );
564        let existing_name_owned = existing
565            .as_ref()
566            .map_or_else(|| new_surface_name.to_owned(), |e| e.name.clone());
567        self.store
568            .upsert_entity(
569                &existing_name_owned,
570                &existing_canonical,
571                entity_type,
572                summary_opt,
573            )
574            .await?;
575
576        // Retrieve existing qdrant_point_id to reuse it (avoids orphaned stale points, IC-S1)
577        let existing_point_id = existing
578            .as_ref()
579            .and_then(|e| e.qdrant_point_id.as_deref())
580            .map(ToOwned::to_owned);
581
582        // Re-embed merged text and upsert to Qdrant
583        let embed_text = format!("{existing_name_owned}: {merged_summary}");
584        let embed_result = tokio::time::timeout(
585            std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
586            provider.embed(&embed_text),
587        )
588        .await;
589
590        match embed_result {
591            Ok(Ok(vec)) => {
592                self.store_entity_embedding(
593                    emb_store,
594                    entity_id,
595                    existing_point_id.as_deref(),
596                    &existing_name_owned,
597                    entity_type,
598                    &merged_summary,
599                    &vec,
600                )
601                .await;
602            }
603            Ok(Err(err)) => {
604                tracing::warn!(
605                    entity_id,
606                    error = %err,
607                    "merge re-embed failed; Qdrant entry may be stale"
608                );
609            }
610            Err(_) => {
611                tracing::warn!(
612                    entity_id,
613                    "merge re-embed timed out; Qdrant entry may be stale"
614                );
615            }
616        }
617
618        Ok(())
619    }
620
621    /// Store an entity embedding in Qdrant and update `qdrant_point_id` in `SQLite`.
622    ///
623    /// When `existing_point_id` is `Some`, the existing Qdrant point is updated in-place
624    /// (upsert by ID) to avoid orphaned stale points. When `None`, a new point is created.
625    ///
626    /// Failures are logged at warn level but do not propagate — the entity is still
627    /// valid in `SQLite` even if Qdrant upsert fails.
628    #[allow(clippy::too_many_arguments)]
629    async fn store_entity_embedding(
630        &self,
631        emb_store: &EmbeddingStore,
632        entity_id: i64,
633        existing_point_id: Option<&str>,
634        name: &str,
635        entity_type: EntityType,
636        summary: &str,
637        vector: &[f32],
638    ) {
639        // TODO(PERF-05): ensure_named_collection() is called on every store_entity_embedding()
640        // invocation, generating one Qdrant network roundtrip per entity in a batch. Cache this
641        // result at resolver construction time via `std::sync::OnceLock<bool>` to call it once.
642        let vector_size = u64::try_from(vector.len()).unwrap_or(384);
643        if let Err(err) = emb_store
644            .ensure_named_collection(ENTITY_COLLECTION, vector_size)
645            .await
646        {
647            tracing::error!(
648                error = %err,
649                "failed to ensure entity embedding collection; skipping Qdrant upsert"
650            );
651            return;
652        }
653
654        let payload = serde_json::json!({
655            "entity_id": entity_id,
656            "name": name,
657            "entity_type": entity_type.as_str(),
658            "summary": summary,
659        });
660
661        if let Some(point_id) = existing_point_id {
662            // Reuse existing point to avoid orphaned stale points (IC-S1)
663            if let Err(err) = emb_store
664                .upsert_to_collection(ENTITY_COLLECTION, point_id, payload, vector.to_vec())
665                .await
666            {
667                tracing::warn!(
668                    entity_id,
669                    error = %err,
670                    "Qdrant upsert (existing point) failed; Qdrant entry may be stale"
671                );
672            }
673        } else {
674            match emb_store
675                .store_to_collection(ENTITY_COLLECTION, payload, vector.to_vec())
676                .await
677            {
678                Ok(point_id) => {
679                    if let Err(err) = self
680                        .store
681                        .set_entity_qdrant_point_id(entity_id, &point_id)
682                        .await
683                    {
684                        tracing::warn!(
685                            entity_id,
686                            error = %err,
687                            "failed to store qdrant_point_id in SQLite"
688                        );
689                    }
690                }
691                Err(err) => {
692                    tracing::warn!(
693                        entity_id,
694                        error = %err,
695                        "Qdrant upsert failed; entity created in SQLite, qdrant_point_id remains NULL"
696                    );
697                }
698            }
699        }
700    }
701
702    /// Ask the LLM whether two entities are the same.
703    ///
704    /// Returns `Some(true)` for merge, `Some(false)` for separate, `None` on failure.
705    #[allow(clippy::too_many_arguments)]
706    async fn llm_disambiguate(
707        &self,
708        provider: &AnyProvider,
709        new_name: &str,
710        new_type: &str,
711        new_summary: &str,
712        existing_name: &str,
713        existing_type: &str,
714        existing_summary: &str,
715        score: f32,
716    ) -> Option<bool> {
717        let prompt = format!(
718            "New entity:\n\
719             - Name: <external-data>{new_name}</external-data>\n\
720             - Type: <external-data>{new_type}</external-data>\n\
721             - Summary: <external-data>{new_summary}</external-data>\n\
722             \n\
723             Existing entity:\n\
724             - Name: <external-data>{existing_name}</external-data>\n\
725             - Type: <external-data>{existing_type}</external-data>\n\
726             - Summary: <external-data>{existing_summary}</external-data>\n\
727             \n\
728             Cosine similarity: {score:.2}\n\
729             \n\
730             Are these the same entity? Respond with JSON: {{\"same_entity\": true}} or {{\"same_entity\": false}}"
731        );
732
733        let messages = [
734            Message::from_legacy(
735                Role::System,
736                "You are an entity disambiguation assistant. Given a new entity mention and \
737                 an existing entity from the knowledge graph, determine if they refer to the same \
738                 real-world entity. Respond only with JSON.",
739            ),
740            Message::from_legacy(Role::User, prompt),
741        ];
742
743        let response = match provider.chat(&messages).await {
744            Ok(r) => r,
745            Err(err) => {
746                tracing::warn!(error = %err, "LLM disambiguation chat failed");
747                return None;
748            }
749        };
750
751        // Parse JSON response, tolerating markdown code fences
752        let json_str = extract_json(&response);
753        match serde_json::from_str::<DisambiguationResponse>(json_str) {
754            Ok(parsed) => Some(parsed.same_entity),
755            Err(err) => {
756                tracing::warn!(error = %err, response = %response, "failed to parse LLM disambiguation response");
757                None
758            }
759        }
760    }
761
762    /// Resolve a batch of extracted entities concurrently.
763    ///
764    /// Returns a `Vec` of `(entity_id, ResolutionOutcome)` in the same order as input.
765    ///
766    /// # Errors
767    ///
768    /// Returns an error if any DB operation fails.
769    ///
770    /// # Panics
771    ///
772    /// Panics if an internal stream collection bug causes a result index to be missing.
773    /// This indicates a programming error and should never occur in correct usage.
774    pub async fn resolve_batch(
775        &self,
776        entities: &[ExtractedEntity],
777    ) -> Result<Vec<(i64, ResolutionOutcome)>, MemoryError> {
778        if entities.is_empty() {
779            return Ok(Vec::new());
780        }
781
782        // Process up to 4 embed+resolve operations concurrently (IC-S2/PERF-01).
783        let mut results: Vec<Option<(i64, ResolutionOutcome)>> = vec![None; entities.len()];
784
785        let mut stream = stream::iter(entities.iter().enumerate().map(|(i, e)| {
786            let name = e.name.clone();
787            let entity_type = e.entity_type.clone();
788            let summary = e.summary.clone();
789            async move {
790                let result = self.resolve(&name, &entity_type, summary.as_deref()).await;
791                (i, result)
792            }
793        }))
794        .buffer_unordered(4);
795
796        while let Some((i, result)) = stream.next().await {
797            match result {
798                Ok(outcome) => results[i] = Some(outcome),
799                Err(err) => return Err(err),
800            }
801        }
802
803        Ok(results
804            .into_iter()
805            .enumerate()
806            .map(|(i, r)| {
807                r.unwrap_or_else(|| {
808                    tracing::warn!(
809                        index = i,
810                        "resolve_batch: missing result at index — bug in stream collection"
811                    );
812                    panic!("resolve_batch: missing result at index {i}")
813                })
814            })
815            .collect())
816    }
817
818    /// Resolve an extracted edge: deduplicate or supersede existing edges.
819    ///
820    /// - If an active edge with the same direction and relation exists with an identical fact,
821    ///   returns `None` (deduplicated).
822    /// - If an active edge with the same direction and relation exists with a different fact,
823    ///   invalidates the old edge and inserts the new one, returning `Some(new_id)`.
824    /// - If no matching edge exists, inserts a new edge and returns `Some(new_id)`.
825    ///
826    /// Relation and fact strings are sanitized (control chars stripped, length-capped).
827    ///
828    /// # Errors
829    ///
830    /// Returns an error if any database operation fails.
831    pub async fn resolve_edge(
832        &self,
833        source_id: i64,
834        target_id: i64,
835        relation: &str,
836        fact: &str,
837        confidence: f32,
838        episode_id: Option<MessageId>,
839    ) -> Result<Option<i64>, MemoryError> {
840        let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
841        let normalized_relation = truncate_to_bytes(&relation_clean, MAX_RELATION_BYTES).to_owned();
842
843        let fact_clean = strip_control_chars(fact.trim());
844        let normalized_fact = truncate_to_bytes(&fact_clean, MAX_FACT_BYTES).to_owned();
845
846        // Fetch only exact-direction edges — no reverse edges to filter out
847        let existing_edges = self.store.edges_exact(source_id, target_id).await?;
848
849        let matching = existing_edges
850            .iter()
851            .find(|e| e.relation == normalized_relation);
852
853        if let Some(old) = matching {
854            if old.fact == normalized_fact {
855                // Exact duplicate — skip
856                return Ok(None);
857            }
858            // Same relation, different fact — supersede
859            self.store.invalidate_edge(old.id).await?;
860        }
861
862        let new_id = self
863            .store
864            .insert_edge(
865                source_id,
866                target_id,
867                &normalized_relation,
868                &normalized_fact,
869                confidence,
870                episode_id,
871            )
872            .await?;
873        Ok(Some(new_id))
874    }
875}
876
877/// Extract a JSON object from a string that may contain markdown code fences.
878fn extract_json(s: &str) -> &str {
879    let trimmed = s.trim();
880    // Strip ```json ... ``` or ``` ... ```
881    if let Some(inner) = trimmed.strip_prefix("```json")
882        && let Some(end) = inner.rfind("```")
883    {
884        return inner[..end].trim();
885    }
886    if let Some(inner) = trimmed.strip_prefix("```")
887        && let Some(end) = inner.rfind("```")
888    {
889        return inner[..end].trim();
890    }
891    // Find first '{' to last '}'
892    if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
893        && start <= end
894    {
895        return &trimmed[start..=end];
896    }
897    trimmed
898}
899
900#[cfg(test)]
901mod tests {
902    use std::sync::Arc;
903
904    use super::*;
905    use crate::in_memory_store::InMemoryVectorStore;
906    use crate::sqlite::SqliteStore;
907
908    async fn setup() -> GraphStore {
909        let store = SqliteStore::new(":memory:").await.unwrap();
910        GraphStore::new(store.pool().clone())
911    }
912
913    async fn setup_with_embedding() -> (GraphStore, Arc<EmbeddingStore>) {
914        let sqlite = SqliteStore::new(":memory:").await.unwrap();
915        let pool = sqlite.pool().clone();
916        let mem_store = Box::new(InMemoryVectorStore::new());
917        let emb = Arc::new(EmbeddingStore::with_store(mem_store, pool));
918        let gs = GraphStore::new(sqlite.pool().clone());
919        (gs, emb)
920    }
921
922    fn make_mock_provider_with_embedding(embedding: Vec<f32>) -> zeph_llm::mock::MockProvider {
923        let mut p = zeph_llm::mock::MockProvider::default();
924        p.embedding = embedding;
925        p.supports_embeddings = true;
926        p
927    }
928
929    // ── Existing tests (resolve() with no embedding store — exact match only) ──
930
931    #[tokio::test]
932    async fn resolve_creates_new_entity() {
933        let gs = setup().await;
934        let resolver = EntityResolver::new(&gs);
935        let (id, outcome) = resolver
936            .resolve("alice", "person", Some("a person"))
937            .await
938            .unwrap();
939        assert!(id > 0);
940        assert_eq!(outcome, ResolutionOutcome::Created);
941    }
942
943    #[tokio::test]
944    async fn resolve_updates_existing_entity() {
945        let gs = setup().await;
946        let resolver = EntityResolver::new(&gs);
947        let (id1, _) = resolver.resolve("alice", "person", None).await.unwrap();
948        let (id2, outcome) = resolver
949            .resolve("alice", "person", Some("updated summary"))
950            .await
951            .unwrap();
952        assert_eq!(id1, id2);
953        assert_eq!(outcome, ResolutionOutcome::ExactMatch);
954
955        let entity = gs
956            .find_entity("alice", EntityType::Person)
957            .await
958            .unwrap()
959            .unwrap();
960        assert_eq!(entity.summary.as_deref(), Some("updated summary"));
961    }
962
963    #[tokio::test]
964    async fn resolve_unknown_type_falls_back_to_concept() {
965        let gs = setup().await;
966        let resolver = EntityResolver::new(&gs);
967        let (id, _) = resolver
968            .resolve("my_thing", "unknown_type", None)
969            .await
970            .unwrap();
971        assert!(id > 0);
972
973        // Verify it was stored as Concept
974        let entity = gs
975            .find_entity("my_thing", EntityType::Concept)
976            .await
977            .unwrap();
978        assert!(entity.is_some());
979    }
980
981    #[tokio::test]
982    async fn resolve_empty_name_returns_error() {
983        let gs = setup().await;
984        let resolver = EntityResolver::new(&gs);
985
986        let result_empty = resolver.resolve("", "concept", None).await;
987        assert!(result_empty.is_err());
988        assert!(matches!(
989            result_empty.unwrap_err(),
990            MemoryError::GraphStore(_)
991        ));
992
993        let result_whitespace = resolver.resolve("   ", "concept", None).await;
994        assert!(result_whitespace.is_err());
995    }
996
997    // FIX-3 defense-in-depth: short entity names must be rejected at the resolver level.
998    #[tokio::test]
999    async fn resolve_short_name_below_min_returns_error() {
1000        let gs = setup().await;
1001        let resolver = EntityResolver::new(&gs);
1002
1003        // "go" and "cd" are 2-byte tokens that represent common noise from tool output.
1004        let err_go = resolver.resolve("go", "technology", None).await;
1005        assert!(err_go.is_err(), "\"go\" (2 bytes) must be rejected");
1006        assert!(
1007            matches!(err_go.unwrap_err(), MemoryError::GraphStore(_)),
1008            "expected GraphStore error for short name"
1009        );
1010
1011        let err_cd = resolver.resolve("cd", "concept", None).await;
1012        assert!(err_cd.is_err(), "\"cd\" (2 bytes) must be rejected");
1013    }
1014
1015    #[tokio::test]
1016    async fn resolve_name_at_min_length_passes() {
1017        let gs = setup().await;
1018        let resolver = EntityResolver::new(&gs);
1019
1020        // "git" is exactly 3 bytes — must be accepted.
1021        let result = resolver.resolve("git", "technology", None).await;
1022        assert!(
1023            result.is_ok(),
1024            "\"git\" (3 bytes) must pass min-length check"
1025        );
1026    }
1027
1028    #[tokio::test]
1029    async fn resolve_case_insensitive() {
1030        let gs = setup().await;
1031        let resolver = EntityResolver::new(&gs);
1032
1033        let (id1, _) = resolver.resolve("Rust", "language", None).await.unwrap();
1034        let (id2, outcome) = resolver.resolve("rust", "language", None).await.unwrap();
1035        assert_eq!(
1036            id1, id2,
1037            "'Rust' and 'rust' should resolve to the same entity"
1038        );
1039        assert_eq!(outcome, ResolutionOutcome::ExactMatch);
1040    }
1041
1042    #[tokio::test]
1043    async fn resolve_edge_inserts_new() {
1044        let gs = setup().await;
1045        let resolver = EntityResolver::new(&gs);
1046
1047        let src = gs
1048            .upsert_entity("src", "src", EntityType::Concept, None)
1049            .await
1050            .unwrap();
1051        let tgt = gs
1052            .upsert_entity("tgt", "tgt", EntityType::Concept, None)
1053            .await
1054            .unwrap();
1055
1056        let result = resolver
1057            .resolve_edge(src, tgt, "uses", "src uses tgt", 0.9, None)
1058            .await
1059            .unwrap();
1060        assert!(result.is_some());
1061        assert!(result.unwrap() > 0);
1062    }
1063
1064    #[tokio::test]
1065    async fn resolve_edge_deduplicates_identical() {
1066        let gs = setup().await;
1067        let resolver = EntityResolver::new(&gs);
1068
1069        let src = gs
1070            .upsert_entity("a", "a", EntityType::Concept, None)
1071            .await
1072            .unwrap();
1073        let tgt = gs
1074            .upsert_entity("b", "b", EntityType::Concept, None)
1075            .await
1076            .unwrap();
1077
1078        let first = resolver
1079            .resolve_edge(src, tgt, "uses", "a uses b", 0.9, None)
1080            .await
1081            .unwrap();
1082        assert!(first.is_some());
1083
1084        let second = resolver
1085            .resolve_edge(src, tgt, "uses", "a uses b", 0.9, None)
1086            .await
1087            .unwrap();
1088        assert!(second.is_none(), "identical edge should be deduplicated");
1089    }
1090
1091    #[tokio::test]
1092    async fn resolve_edge_supersedes_contradictory() {
1093        let gs = setup().await;
1094        let resolver = EntityResolver::new(&gs);
1095
1096        let src = gs
1097            .upsert_entity("x", "x", EntityType::Concept, None)
1098            .await
1099            .unwrap();
1100        let tgt = gs
1101            .upsert_entity("y", "y", EntityType::Concept, None)
1102            .await
1103            .unwrap();
1104
1105        let first_id = resolver
1106            .resolve_edge(src, tgt, "prefers", "x prefers y (old)", 0.8, None)
1107            .await
1108            .unwrap()
1109            .unwrap();
1110
1111        let second_id = resolver
1112            .resolve_edge(src, tgt, "prefers", "x prefers y (new)", 0.9, None)
1113            .await
1114            .unwrap()
1115            .unwrap();
1116
1117        assert_ne!(first_id, second_id, "superseded edge should have a new ID");
1118
1119        // Old edge should be invalidated
1120        let active_count = gs.active_edge_count().await.unwrap();
1121        assert_eq!(active_count, 1, "only new edge should be active");
1122    }
1123
1124    #[tokio::test]
1125    async fn resolve_edge_direction_sensitive() {
1126        // A->B "uses" should not interfere with B->A "uses" dedup
1127        let gs = setup().await;
1128        let resolver = EntityResolver::new(&gs);
1129
1130        let a = gs
1131            .upsert_entity("node_a", "node_a", EntityType::Concept, None)
1132            .await
1133            .unwrap();
1134        let b = gs
1135            .upsert_entity("node_b", "node_b", EntityType::Concept, None)
1136            .await
1137            .unwrap();
1138
1139        // Insert A->B
1140        let id1 = resolver
1141            .resolve_edge(a, b, "uses", "A uses B", 0.9, None)
1142            .await
1143            .unwrap();
1144        assert!(id1.is_some());
1145
1146        // Insert B->A with different fact — should NOT invalidate A->B (different direction)
1147        let id2 = resolver
1148            .resolve_edge(b, a, "uses", "B uses A (different direction)", 0.9, None)
1149            .await
1150            .unwrap();
1151        assert!(id2.is_some());
1152
1153        // Both edges should still be active
1154        let active_count = gs.active_edge_count().await.unwrap();
1155        assert_eq!(active_count, 2, "both directional edges should be active");
1156    }
1157
1158    #[tokio::test]
1159    async fn resolve_edge_normalizes_relation_case() {
1160        let gs = setup().await;
1161        let resolver = EntityResolver::new(&gs);
1162
1163        let src = gs
1164            .upsert_entity("p", "p", EntityType::Concept, None)
1165            .await
1166            .unwrap();
1167        let tgt = gs
1168            .upsert_entity("q", "q", EntityType::Concept, None)
1169            .await
1170            .unwrap();
1171
1172        // Insert with uppercase relation
1173        let id1 = resolver
1174            .resolve_edge(src, tgt, "Uses", "p uses q", 0.9, None)
1175            .await
1176            .unwrap();
1177        assert!(id1.is_some());
1178
1179        // Insert with lowercase relation — same normalized relation, same fact → deduplicate
1180        let id2 = resolver
1181            .resolve_edge(src, tgt, "uses", "p uses q", 0.9, None)
1182            .await
1183            .unwrap();
1184        assert!(id2.is_none(), "normalized relations should deduplicate");
1185    }
1186
1187    // ── IC-01: entity_type lowercased before parse ────────────────────────────
1188
1189    #[tokio::test]
1190    async fn resolve_entity_type_uppercase_parsed_correctly() {
1191        let gs = setup().await;
1192        let resolver = EntityResolver::new(&gs);
1193
1194        // "Person" (title case from LLM) should parse as EntityType::Person, not fall back to Concept
1195        let (id, _) = resolver
1196            .resolve("test_entity", "Person", None)
1197            .await
1198            .unwrap();
1199        assert!(id > 0);
1200
1201        let entity = gs
1202            .find_entity("test_entity", EntityType::Person)
1203            .await
1204            .unwrap();
1205        assert!(entity.is_some(), "entity should be stored as Person type");
1206    }
1207
1208    #[tokio::test]
1209    async fn resolve_entity_type_all_caps_parsed_correctly() {
1210        let gs = setup().await;
1211        let resolver = EntityResolver::new(&gs);
1212
1213        let (id, _) = resolver.resolve("my_lang", "LANGUAGE", None).await.unwrap();
1214        assert!(id > 0);
1215
1216        let entity = gs
1217            .find_entity("my_lang", EntityType::Language)
1218            .await
1219            .unwrap();
1220        assert!(entity.is_some(), "entity should be stored as Language type");
1221    }
1222
1223    // ── SEC-GRAPH-01: entity name length cap ──────────────────────────────────
1224
1225    #[tokio::test]
1226    async fn resolve_truncates_long_entity_name() {
1227        let gs = setup().await;
1228        let resolver = EntityResolver::new(&gs);
1229
1230        let long_name = "a".repeat(1024);
1231        let (id, _) = resolver.resolve(&long_name, "concept", None).await.unwrap();
1232        assert!(id > 0);
1233
1234        // Entity should exist with a truncated name (512 bytes)
1235        let entity = gs
1236            .find_entity(&"a".repeat(512), EntityType::Concept)
1237            .await
1238            .unwrap();
1239        assert!(entity.is_some(), "truncated name should be stored");
1240    }
1241
1242    // ── SEC-GRAPH-02: control character stripping ─────────────────────────────
1243
1244    #[tokio::test]
1245    async fn resolve_strips_control_chars_from_name() {
1246        let gs = setup().await;
1247        let resolver = EntityResolver::new(&gs);
1248
1249        // Name with null byte and a BiDi override
1250        let name_with_ctrl = "rust\x00lang";
1251        let (id, _) = resolver
1252            .resolve(name_with_ctrl, "language", None)
1253            .await
1254            .unwrap();
1255        assert!(id > 0);
1256
1257        // Stored name should have control chars removed
1258        let entity = gs
1259            .find_entity("rustlang", EntityType::Language)
1260            .await
1261            .unwrap();
1262        assert!(
1263            entity.is_some(),
1264            "control chars should be stripped from stored name"
1265        );
1266    }
1267
1268    #[tokio::test]
1269    async fn resolve_strips_bidi_overrides_from_name() {
1270        let gs = setup().await;
1271        let resolver = EntityResolver::new(&gs);
1272
1273        // U+202E is RIGHT-TO-LEFT OVERRIDE — a BiDi spoof character
1274        let name_with_bidi = "rust\u{202E}lang";
1275        let (id, _) = resolver
1276            .resolve(name_with_bidi, "language", None)
1277            .await
1278            .unwrap();
1279        assert!(id > 0);
1280
1281        let entity = gs
1282            .find_entity("rustlang", EntityType::Language)
1283            .await
1284            .unwrap();
1285        assert!(entity.is_some(), "BiDi override chars should be stripped");
1286    }
1287
1288    // ── Helper unit tests for sanitization functions ──────────────────────────
1289
1290    #[test]
1291    fn strip_control_chars_removes_ascii_controls() {
1292        assert_eq!(strip_control_chars("hello\x00world"), "helloworld");
1293        assert_eq!(strip_control_chars("tab\there"), "tabhere");
1294        assert_eq!(strip_control_chars("new\nline"), "newline");
1295    }
1296
1297    #[test]
1298    fn strip_control_chars_removes_bidi() {
1299        let bidi = "\u{202E}spoof";
1300        assert_eq!(strip_control_chars(bidi), "spoof");
1301    }
1302
1303    #[test]
1304    fn strip_control_chars_preserves_normal_unicode() {
1305        assert_eq!(strip_control_chars("привет мир"), "привет мир");
1306        assert_eq!(strip_control_chars("日本語"), "日本語");
1307    }
1308
1309    #[test]
1310    fn truncate_to_bytes_exact_boundary() {
1311        let s = "hello";
1312        assert_eq!(truncate_to_bytes(s, 5), "hello");
1313        assert_eq!(truncate_to_bytes(s, 3), "hel");
1314    }
1315
1316    #[test]
1317    fn truncate_to_bytes_respects_utf8_boundary() {
1318        // "é" is 2 bytes in UTF-8 — truncating at 1 byte should give ""
1319        let s = "élan";
1320        let truncated = truncate_to_bytes(s, 1);
1321        assert!(s.is_char_boundary(truncated.len()));
1322    }
1323
1324    // ── New tests: embedding-based resolution ─────────────────────────────────
1325
1326    #[tokio::test]
1327    async fn resolve_with_embedding_store_score_above_threshold_merges() {
1328        let (gs, emb) = setup_with_embedding().await;
1329        // Pre-insert an existing entity (different name to avoid exact match).
1330        // "python programming lang" is in Qdrant; we resolve "python scripting lang"
1331        // which embeds to the identical vector → cosine similarity = 1.0 > 0.85 → merge.
1332        let existing_id = gs
1333            .upsert_entity(
1334                "python programming lang",
1335                "python programming lang",
1336                EntityType::Language,
1337                Some("a programming language"),
1338            )
1339            .await
1340            .unwrap();
1341
1342        let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1343        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1344            .await
1345            .unwrap();
1346        let payload = serde_json::json!({
1347            "entity_id": existing_id,
1348            "name": "python programming lang",
1349            "entity_type": "language",
1350            "summary": "a programming language",
1351        });
1352        let point_id = emb
1353            .store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1354            .await
1355            .unwrap();
1356        gs.set_entity_qdrant_point_id(existing_id, &point_id)
1357            .await
1358            .unwrap();
1359
1360        // Mock provider returns the same vector for any text → cosine similarity = 1.0 > 0.85
1361        let provider = make_mock_provider_with_embedding(mock_vec);
1362        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1363
1364        let resolver = EntityResolver::new(&gs)
1365            .with_embedding_store(&emb)
1366            .with_provider(&any_provider)
1367            .with_thresholds(0.85, 0.70);
1368
1369        // Resolve a different name — no exact match, embedding match wins
1370        let (id, outcome) = resolver
1371            .resolve(
1372                "python scripting lang",
1373                "language",
1374                Some("scripting language"),
1375            )
1376            .await
1377            .unwrap();
1378
1379        assert_eq!(id, existing_id, "should return existing entity ID on merge");
1380        assert!(
1381            matches!(outcome, ResolutionOutcome::EmbeddingMatch { score } if score > 0.85),
1382            "outcome should be EmbeddingMatch with score > 0.85, got {outcome:?}"
1383        );
1384    }
1385
1386    #[tokio::test]
1387    async fn resolve_with_embedding_store_score_below_ambiguous_creates_new() {
1388        let (gs, emb) = setup_with_embedding().await;
1389        // Insert existing entity with orthogonal vector
1390        let existing_id = gs
1391            .upsert_entity("java", "java", EntityType::Language, Some("java language"))
1392            .await
1393            .unwrap();
1394
1395        // Existing uses [1,0,0,0]; new entity will embed to [0,1,0,0] (orthogonal, score=0)
1396        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1397            .await
1398            .unwrap();
1399        let payload = serde_json::json!({
1400            "entity_id": existing_id,
1401            "name": "java",
1402            "entity_type": "language",
1403            "summary": "java language",
1404        });
1405        emb.store_to_collection(ENTITY_COLLECTION, payload, vec![1.0, 0.0, 0.0, 0.0])
1406            .await
1407            .unwrap();
1408
1409        // Mock returns orthogonal vector → score = 0.0 < 0.70
1410        let provider = make_mock_provider_with_embedding(vec![0.0, 1.0, 0.0, 0.0]);
1411        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1412
1413        let resolver = EntityResolver::new(&gs)
1414            .with_embedding_store(&emb)
1415            .with_provider(&any_provider)
1416            .with_thresholds(0.85, 0.70);
1417
1418        let (id, outcome) = resolver
1419            .resolve("kotlin", "language", Some("kotlin language"))
1420            .await
1421            .unwrap();
1422
1423        assert_ne!(id, existing_id, "orthogonal entity should create new");
1424        assert_eq!(outcome, ResolutionOutcome::Created);
1425    }
1426
1427    #[tokio::test]
1428    async fn resolve_with_embedding_failure_falls_back_to_create() {
1429        // Use a mock with supports_embeddings=false — embed() returns EmbedUnsupported error,
1430        // which triggers the fallback path (create new entity).
1431        let sqlite2 = SqliteStore::new(":memory:").await.unwrap();
1432        let pool2 = sqlite2.pool().clone();
1433        let mem2 = Box::new(InMemoryVectorStore::new());
1434        let emb2 = Arc::new(EmbeddingStore::with_store(mem2, pool2));
1435        let gs2 = GraphStore::new(sqlite2.pool().clone());
1436
1437        let mut mock = zeph_llm::mock::MockProvider::default();
1438        mock.supports_embeddings = false;
1439        let any_provider = zeph_llm::any::AnyProvider::Mock(mock);
1440
1441        let resolver = EntityResolver::new(&gs2)
1442            .with_embedding_store(&emb2)
1443            .with_provider(&any_provider);
1444
1445        let (id, outcome) = resolver
1446            .resolve("testentity", "concept", Some("summary"))
1447            .await
1448            .unwrap();
1449        assert!(id > 0);
1450        assert_eq!(outcome, ResolutionOutcome::Created);
1451    }
1452
1453    #[tokio::test]
1454    async fn resolve_fallback_increments_counter() {
1455        let (gs, emb) = setup_with_embedding().await;
1456
1457        // Provider with embed that fails (supports_embeddings=false → EmbedUnsupported error)
1458        let mut mock = zeph_llm::mock::MockProvider::default();
1459        mock.supports_embeddings = false;
1460        let any_provider = zeph_llm::any::AnyProvider::Mock(mock);
1461
1462        let resolver = EntityResolver::new(&gs)
1463            .with_embedding_store(&emb)
1464            .with_provider(&any_provider);
1465
1466        let fallback_count = resolver.fallback_count();
1467
1468        // First call: embed fails → fallback
1469        resolver.resolve("entity_a", "concept", None).await.unwrap();
1470
1471        assert_eq!(
1472            fallback_count.load(std::sync::atomic::Ordering::Relaxed),
1473            1,
1474            "fallback counter should be 1 after embed failure"
1475        );
1476    }
1477
1478    #[tokio::test]
1479    async fn resolve_batch_processes_multiple_entities() {
1480        let gs = setup().await;
1481        let resolver = EntityResolver::new(&gs);
1482
1483        let entities = vec![
1484            ExtractedEntity {
1485                name: "rust".into(),
1486                entity_type: "language".into(),
1487                summary: Some("systems language".into()),
1488            },
1489            ExtractedEntity {
1490                name: "python".into(),
1491                entity_type: "language".into(),
1492                summary: None,
1493            },
1494            ExtractedEntity {
1495                name: "cargo".into(),
1496                entity_type: "tool".into(),
1497                summary: Some("rust build tool".into()),
1498            },
1499        ];
1500
1501        let results = resolver.resolve_batch(&entities).await.unwrap();
1502        assert_eq!(results.len(), 3);
1503        for (id, outcome) in &results {
1504            assert!(*id > 0);
1505            assert_eq!(*outcome, ResolutionOutcome::Created);
1506        }
1507    }
1508
1509    #[tokio::test]
1510    async fn resolve_batch_empty_returns_empty() {
1511        let gs = setup().await;
1512        let resolver = EntityResolver::new(&gs);
1513        let results = resolver.resolve_batch(&[]).await.unwrap();
1514        assert!(results.is_empty());
1515    }
1516
1517    #[tokio::test]
1518    async fn merge_combines_summaries() {
1519        let (gs, emb) = setup_with_embedding().await;
1520        // Use a different name for the existing entity so exact match doesn't trigger.
1521        // "mergetest v1" is stored with embedding; we then resolve "mergetest v2" which
1522        // embeds to the same vector → similarity = 1.0 > threshold → merge.
1523        let existing_id = gs
1524            .upsert_entity(
1525                "mergetest v1",
1526                "mergetest v1",
1527                EntityType::Concept,
1528                Some("first summary"),
1529            )
1530            .await
1531            .unwrap();
1532
1533        let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1534        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1535            .await
1536            .unwrap();
1537        let payload = serde_json::json!({
1538            "entity_id": existing_id,
1539            "name": "mergetest v1",
1540            "entity_type": "concept",
1541            "summary": "first summary",
1542        });
1543        let point_id = emb
1544            .store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1545            .await
1546            .unwrap();
1547        gs.set_entity_qdrant_point_id(existing_id, &point_id)
1548            .await
1549            .unwrap();
1550
1551        let provider = make_mock_provider_with_embedding(mock_vec);
1552        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1553
1554        let resolver = EntityResolver::new(&gs)
1555            .with_embedding_store(&emb)
1556            .with_provider(&any_provider)
1557            .with_thresholds(0.85, 0.70);
1558
1559        // Resolve "mergetest v2" — no exact match, but embedding is identical → merge
1560        let (id, outcome) = resolver
1561            .resolve("mergetest v2", "concept", Some("second summary"))
1562            .await
1563            .unwrap();
1564
1565        assert_eq!(id, existing_id);
1566        assert!(matches!(outcome, ResolutionOutcome::EmbeddingMatch { .. }));
1567
1568        // Verify the merged summary was updated on the existing entity
1569        let entity = gs
1570            .find_entity("mergetest v1", EntityType::Concept)
1571            .await
1572            .unwrap()
1573            .unwrap();
1574        let summary = entity.summary.unwrap_or_default();
1575        assert!(
1576            summary.contains("first summary") && summary.contains("second summary"),
1577            "merged summary should contain both: got {summary:?}"
1578        );
1579    }
1580
1581    #[tokio::test]
1582    async fn merge_preserves_older_entity_id() {
1583        let (gs, emb) = setup_with_embedding().await;
1584        // "legacy entity" stored with embedding; "legacy entity variant" has same vector → merge
1585        let existing_id = gs
1586            .upsert_entity(
1587                "legacy entity",
1588                "legacy entity",
1589                EntityType::Concept,
1590                Some("old info"),
1591            )
1592            .await
1593            .unwrap();
1594
1595        let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1596        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1597            .await
1598            .unwrap();
1599        let payload = serde_json::json!({
1600            "entity_id": existing_id,
1601            "name": "legacy entity",
1602            "entity_type": "concept",
1603            "summary": "old info",
1604        });
1605        emb.store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1606            .await
1607            .unwrap();
1608
1609        let provider = make_mock_provider_with_embedding(mock_vec);
1610        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1611
1612        let resolver = EntityResolver::new(&gs)
1613            .with_embedding_store(&emb)
1614            .with_provider(&any_provider)
1615            .with_thresholds(0.85, 0.70);
1616
1617        let (returned_id, _) = resolver
1618            .resolve("legacy entity variant", "concept", Some("new info"))
1619            .await
1620            .unwrap();
1621
1622        assert_eq!(
1623            returned_id, existing_id,
1624            "older entity ID should be preserved on merge"
1625        );
1626    }
1627
1628    #[tokio::test]
1629    async fn entity_type_filter_prevents_cross_type_merge() {
1630        let (gs, emb) = setup_with_embedding().await;
1631
1632        // Insert a Person named "python"
1633        let person_id = gs
1634            .upsert_entity(
1635                "python",
1636                "python",
1637                EntityType::Person,
1638                Some("a person named python"),
1639            )
1640            .await
1641            .unwrap();
1642
1643        let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1644        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1645            .await
1646            .unwrap();
1647        let payload = serde_json::json!({
1648            "entity_id": person_id,
1649            "name": "python",
1650            "entity_type": "person",
1651            "summary": "a person named python",
1652        });
1653        emb.store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1654            .await
1655            .unwrap();
1656
1657        let provider = make_mock_provider_with_embedding(mock_vec);
1658        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1659
1660        let resolver = EntityResolver::new(&gs)
1661            .with_embedding_store(&emb)
1662            .with_provider(&any_provider)
1663            .with_thresholds(0.85, 0.70);
1664
1665        // Resolve "python" as Language — should NOT merge with the Person entity
1666        let (lang_id, outcome) = resolver
1667            .resolve("python", "language", Some("python language"))
1668            .await
1669            .unwrap();
1670
1671        // The entity_type filter should prevent merging person "python" with language "python"
1672        // Check: either created new or an exact match was found under language type
1673        assert_ne!(
1674            lang_id, person_id,
1675            "language entity should not merge with person entity"
1676        );
1677        // The entity_type filter causes no embedding candidate to survive the type filter,
1678        // so resolution falls back to creating a new entity.
1679        assert_eq!(outcome, ResolutionOutcome::Created);
1680    }
1681
1682    #[tokio::test]
1683    async fn custom_thresholds_respected() {
1684        let (gs, emb) = setup_with_embedding().await;
1685        // With a very high threshold (1.0), even identical vectors won't merge
1686        // (they'd score exactly 1.0 which is NOT > 1.0, so... let's use 0.5 threshold
1687        // and verify score below 0.5 creates new)
1688        let existing_id = gs
1689            .upsert_entity(
1690                "threshold_test",
1691                "threshold_test",
1692                EntityType::Concept,
1693                Some("base"),
1694            )
1695            .await
1696            .unwrap();
1697
1698        let existing_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1699        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1700            .await
1701            .unwrap();
1702        let payload = serde_json::json!({
1703            "entity_id": existing_id,
1704            "name": "threshold_test",
1705            "entity_type": "concept",
1706            "summary": "base",
1707        });
1708        emb.store_to_collection(ENTITY_COLLECTION, payload, existing_vec)
1709            .await
1710            .unwrap();
1711
1712        // Orthogonal vector → score = 0.0
1713        let provider = make_mock_provider_with_embedding(vec![0.0, 1.0, 0.0, 0.0]);
1714        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1715
1716        // With thresholds 0.50/0.30, score=0 is below 0.30 → create new
1717        let resolver = EntityResolver::new(&gs)
1718            .with_embedding_store(&emb)
1719            .with_provider(&any_provider)
1720            .with_thresholds(0.50, 0.30);
1721
1722        let (id, outcome) = resolver
1723            .resolve("new_concept", "concept", Some("different"))
1724            .await
1725            .unwrap();
1726
1727        assert_ne!(id, existing_id);
1728        assert_eq!(outcome, ResolutionOutcome::Created);
1729    }
1730
1731    #[tokio::test]
1732    async fn resolve_outcome_exact_match_no_embedding_store() {
1733        let gs = setup().await;
1734        let resolver = EntityResolver::new(&gs);
1735
1736        resolver.resolve("existing", "concept", None).await.unwrap();
1737        let (_, outcome) = resolver.resolve("existing", "concept", None).await.unwrap();
1738        assert_eq!(outcome, ResolutionOutcome::ExactMatch);
1739    }
1740
1741    #[tokio::test]
1742    async fn extract_json_strips_markdown_fences() {
1743        let with_fence = "```json\n{\"same_entity\": true}\n```";
1744        let extracted = extract_json(with_fence);
1745        let parsed: DisambiguationResponse = serde_json::from_str(extracted).unwrap();
1746        assert!(parsed.same_entity);
1747
1748        let without_fence = "{\"same_entity\": false}";
1749        let extracted2 = extract_json(without_fence);
1750        let parsed2: DisambiguationResponse = serde_json::from_str(extracted2).unwrap();
1751        assert!(!parsed2.same_entity);
1752    }
1753
1754    // Helper: build a MockProvider with embeddings enabled, given vector, and queued chat responses.
1755    fn make_mock_with_embedding_and_chat(
1756        embedding: Vec<f32>,
1757        chat_responses: Vec<String>,
1758    ) -> zeph_llm::mock::MockProvider {
1759        let mut p = zeph_llm::mock::MockProvider::with_responses(chat_responses);
1760        p.embedding = embedding;
1761        p.supports_embeddings = true;
1762        p
1763    }
1764
1765    // Seed an existing entity into both SQLite and InMemoryVectorStore at a known vector.
1766    async fn seed_entity_with_vector(
1767        gs: &GraphStore,
1768        emb: &Arc<EmbeddingStore>,
1769        name: &str,
1770        entity_type: EntityType,
1771        summary: &str,
1772        vector: Vec<f32>,
1773    ) -> i64 {
1774        let id = gs
1775            .upsert_entity(name, name, entity_type, Some(summary))
1776            .await
1777            .unwrap();
1778        emb.ensure_named_collection(ENTITY_COLLECTION, u64::try_from(vector.len()).unwrap())
1779            .await
1780            .unwrap();
1781        let payload = serde_json::json!({
1782            "entity_id": id,
1783            "name": name,
1784            "entity_type": entity_type.as_str(),
1785            "summary": summary,
1786        });
1787        let point_id = emb
1788            .store_to_collection(ENTITY_COLLECTION, payload, vector)
1789            .await
1790            .unwrap();
1791        gs.set_entity_qdrant_point_id(id, &point_id).await.unwrap();
1792        id
1793    }
1794
1795    // ── GAP-1: ambiguous score + LLM says same_entity=true → LlmDisambiguated ─
1796
1797    #[tokio::test]
1798    async fn resolve_ambiguous_score_llm_says_merge() {
1799        // existing entity at [1,0,0,0]; new entity embeds to [1,1,0,0] → cosine ≈ 0.707
1800        // thresholds: similarity=0.85, ambiguous=0.50 → score 0.707 is in [0.50, 0.85)
1801        let (gs, emb) = setup_with_embedding().await;
1802        let existing_id = seed_entity_with_vector(
1803            &gs,
1804            &emb,
1805            "goroutine",
1806            EntityType::Concept,
1807            "go concurrency primitive",
1808            vec![1.0, 0.0, 0.0, 0.0],
1809        )
1810        .await;
1811
1812        // LLM responds with same_entity=true → should merge
1813        let provider = make_mock_with_embedding_and_chat(
1814            vec![1.0, 1.0, 0.0, 0.0],
1815            vec![r#"{"same_entity": true}"#.to_owned()],
1816        );
1817        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1818
1819        let resolver = EntityResolver::new(&gs)
1820            .with_embedding_store(&emb)
1821            .with_provider(&any_provider)
1822            .with_thresholds(0.85, 0.50);
1823
1824        let (id, outcome) = resolver
1825            .resolve("goroutine concurrency", "concept", Some("go concurrency"))
1826            .await
1827            .unwrap();
1828
1829        assert_eq!(
1830            id, existing_id,
1831            "should return existing entity ID on LLM merge"
1832        );
1833        assert_eq!(outcome, ResolutionOutcome::LlmDisambiguated);
1834    }
1835
1836    // ── GAP-2: ambiguous score + LLM says same_entity=false → Created ──────────
1837
1838    #[tokio::test]
1839    async fn resolve_ambiguous_score_llm_says_different() {
1840        let (gs, emb) = setup_with_embedding().await;
1841        let existing_id = seed_entity_with_vector(
1842            &gs,
1843            &emb,
1844            "channel",
1845            EntityType::Concept,
1846            "go channel",
1847            vec![1.0, 0.0, 0.0, 0.0],
1848        )
1849        .await;
1850
1851        // LLM responds with same_entity=false → should create new entity
1852        let provider = make_mock_with_embedding_and_chat(
1853            vec![1.0, 1.0, 0.0, 0.0],
1854            vec![r#"{"same_entity": false}"#.to_owned()],
1855        );
1856        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1857
1858        let resolver = EntityResolver::new(&gs)
1859            .with_embedding_store(&emb)
1860            .with_provider(&any_provider)
1861            .with_thresholds(0.85, 0.50);
1862
1863        let (id, outcome) = resolver
1864            .resolve("network channel", "concept", Some("networking channel"))
1865            .await
1866            .unwrap();
1867
1868        assert_ne!(
1869            id, existing_id,
1870            "LLM-rejected match should create new entity"
1871        );
1872        assert_eq!(outcome, ResolutionOutcome::Created);
1873    }
1874
1875    // ── GAP-3: ambiguous score + LLM chat fails → fallback counter incremented ─
1876
1877    #[tokio::test]
1878    async fn resolve_ambiguous_score_llm_failure_increments_fallback() {
1879        let (gs, emb) = setup_with_embedding().await;
1880        let existing_id = seed_entity_with_vector(
1881            &gs,
1882            &emb,
1883            "mutex",
1884            EntityType::Concept,
1885            "mutual exclusion lock",
1886            vec![1.0, 0.0, 0.0, 0.0],
1887        )
1888        .await;
1889
1890        // fail_chat=true → provider.chat() returns Err → None from llm_disambiguate
1891        let mut provider = make_mock_with_embedding_and_chat(vec![1.0, 1.0, 0.0, 0.0], vec![]);
1892        provider.fail_chat = true;
1893        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1894
1895        let resolver = EntityResolver::new(&gs)
1896            .with_embedding_store(&emb)
1897            .with_provider(&any_provider)
1898            .with_thresholds(0.85, 0.50);
1899
1900        let fallback_count = resolver.fallback_count();
1901
1902        let (id, outcome) = resolver
1903            .resolve("mutex lock", "concept", Some("synchronization primitive"))
1904            .await
1905            .unwrap();
1906
1907        // LLM failure → fallback to create new
1908        assert_ne!(
1909            id, existing_id,
1910            "LLM failure should create new entity (fallback)"
1911        );
1912        assert_eq!(outcome, ResolutionOutcome::Created);
1913        assert_eq!(
1914            fallback_count.load(std::sync::atomic::Ordering::Relaxed),
1915            1,
1916            "fallback counter should be incremented on LLM chat failure"
1917        );
1918    }
1919
1920    // ── Canonicalization / alias tests ────────────────────────────────────────
1921
1922    #[tokio::test]
1923    async fn resolve_creates_entity_with_canonical_name() {
1924        let gs = setup().await;
1925        let resolver = EntityResolver::new(&gs);
1926        let (id, _) = resolver.resolve("Rust", "language", None).await.unwrap();
1927        assert!(id > 0);
1928        let entity = gs
1929            .find_entity("rust", EntityType::Language)
1930            .await
1931            .unwrap()
1932            .unwrap();
1933        assert_eq!(entity.canonical_name, "rust");
1934    }
1935
1936    #[tokio::test]
1937    async fn resolve_adds_alias_on_create() {
1938        let gs = setup().await;
1939        let resolver = EntityResolver::new(&gs);
1940        let (id, _) = resolver.resolve("Rust", "language", None).await.unwrap();
1941        let aliases = gs.aliases_for_entity(id).await.unwrap();
1942        assert!(
1943            !aliases.is_empty(),
1944            "new entity should have at least one alias"
1945        );
1946        assert!(aliases.iter().any(|a| a.alias_name == "rust"));
1947    }
1948
1949    #[tokio::test]
1950    async fn resolve_reuses_entity_by_alias() {
1951        let gs = setup().await;
1952        let resolver = EntityResolver::new(&gs);
1953
1954        // Create entity and register an alias
1955        let (id1, _) = resolver.resolve("rust", "language", None).await.unwrap();
1956        gs.add_alias(id1, "rust-lang").await.unwrap();
1957
1958        // Resolve using the alias — should return the same entity
1959        let (id2, _) = resolver
1960            .resolve("rust-lang", "language", None)
1961            .await
1962            .unwrap();
1963        assert_eq!(
1964            id1, id2,
1965            "'rust-lang' alias should resolve to same entity as 'rust'"
1966        );
1967    }
1968
1969    #[tokio::test]
1970    async fn resolve_alias_match_respects_entity_type() {
1971        let gs = setup().await;
1972        let resolver = EntityResolver::new(&gs);
1973
1974        // "python" as a Language
1975        let (lang_id, _) = resolver.resolve("python", "language", None).await.unwrap();
1976
1977        // "python" as a Tool should create a separate entity (different type)
1978        let (tool_id, _) = resolver.resolve("python", "tool", None).await.unwrap();
1979        assert_ne!(
1980            lang_id, tool_id,
1981            "same name with different type should be separate entities"
1982        );
1983    }
1984
1985    #[tokio::test]
1986    async fn resolve_preserves_existing_aliases() {
1987        let gs = setup().await;
1988        let resolver = EntityResolver::new(&gs);
1989
1990        let (id, _) = resolver.resolve("rust", "language", None).await.unwrap();
1991        gs.add_alias(id, "rust-lang").await.unwrap();
1992
1993        // Upserting same entity should not remove prior aliases
1994        resolver
1995            .resolve("rust", "language", Some("updated"))
1996            .await
1997            .unwrap();
1998        let aliases = gs.aliases_for_entity(id).await.unwrap();
1999        assert!(
2000            aliases.iter().any(|a| a.alias_name == "rust-lang"),
2001            "prior alias must be preserved"
2002        );
2003    }
2004
2005    #[tokio::test]
2006    async fn resolve_original_form_registered_as_alias() {
2007        let gs = setup().await;
2008        let resolver = EntityResolver::new(&gs);
2009
2010        // "  Rust  " — original trimmed lowercased form is "rust", same as normalized
2011        // So only one alias should be registered (no duplicate)
2012        let (id, _) = resolver
2013            .resolve("  Rust  ", "language", None)
2014            .await
2015            .unwrap();
2016        let aliases = gs.aliases_for_entity(id).await.unwrap();
2017        assert!(aliases.iter().any(|a| a.alias_name == "rust"));
2018    }
2019
2020    #[tokio::test]
2021    async fn resolve_entity_with_many_aliases() {
2022        let gs = setup().await;
2023        let id = gs
2024            .upsert_entity("bigentity", "bigentity", EntityType::Concept, None)
2025            .await
2026            .unwrap();
2027        for i in 0..100 {
2028            gs.add_alias(id, &format!("alias-{i}")).await.unwrap();
2029        }
2030        let aliases = gs.aliases_for_entity(id).await.unwrap();
2031        assert_eq!(aliases.len(), 100);
2032
2033        // Fuzzy search should still work via alias
2034        let results = gs.find_entities_fuzzy("alias-50", 10).await.unwrap();
2035        assert!(results.iter().any(|e| e.id == id));
2036    }
2037}