Skip to main content

zeph_memory/graph/
resolver.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::sync::Arc;
5
6use dashmap::DashMap;
7use futures::stream::{self, StreamExt as _};
8use schemars::JsonSchema;
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use zeph_llm::any::AnyProvider;
12use zeph_llm::provider::{LlmProvider as _, Message, Role};
13
14use super::store::GraphStore;
15use super::types::EntityType;
16use crate::embedding_store::EmbeddingStore;
17use crate::error::MemoryError;
18use crate::graph::extractor::ExtractedEntity;
19use crate::types::MessageId;
20use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
21
22/// Maximum byte length for entity names stored in the graph.
23const MAX_ENTITY_NAME_BYTES: usize = 512;
24/// Maximum byte length for relation strings.
25const MAX_RELATION_BYTES: usize = 256;
26/// Maximum byte length for fact strings.
27const MAX_FACT_BYTES: usize = 2048;
28
29/// Qdrant collection for entity embeddings.
30const ENTITY_COLLECTION: &str = "zeph_graph_entities";
31
32/// Timeout for a single `embed()` call in seconds.
33const EMBED_TIMEOUT_SECS: u64 = 30;
34
35/// Strip ASCII control characters and Unicode `BiDi` override codepoints.
36fn strip_control_chars(s: &str) -> String {
37    s.chars()
38        .filter(|c| !c.is_control() && !matches!(*c as u32, 0x202A..=0x202E | 0x2066..=0x2069))
39        .collect()
40}
41
42/// Truncate a string to at most `max_bytes` bytes at a valid UTF-8 char boundary.
43fn truncate_to_bytes(s: &str, max_bytes: usize) -> &str {
44    if s.len() <= max_bytes {
45        return s;
46    }
47    let mut boundary = max_bytes;
48    while !s.is_char_boundary(boundary) {
49        boundary -= 1;
50    }
51    &s[..boundary]
52}
53
54/// Outcome of an entity resolution attempt.
55#[derive(Debug, Clone, PartialEq)]
56pub enum ResolutionOutcome {
57    /// Exact name+type match in `SQLite`.
58    ExactMatch,
59    /// Cosine similarity >= merge threshold; score is the cosine similarity value.
60    EmbeddingMatch { score: f32 },
61    /// LLM confirmed merge in ambiguous similarity range.
62    LlmDisambiguated,
63    /// New entity was created.
64    Created,
65}
66
67/// LLM response for entity disambiguation.
68#[derive(Debug, Deserialize, JsonSchema)]
69struct DisambiguationResponse {
70    same_entity: bool,
71}
72
73/// Per-entity-name lock guard to prevent concurrent duplicate creation.
74///
75/// Keyed by normalized entity name. Entities with different names resolve concurrently;
76/// entities with the same name are serialized.
77///
78/// TODO(SEC-M33-02): This map grows unboundedly — one entry per unique normalized name.
79/// For a short-lived resolver this is acceptable. If the resolver becomes long-lived
80/// (stored in `SemanticMemory`), add eviction or use a fixed-size sharded lock array.
81type NameLockMap = Arc<DashMap<String, Arc<Mutex<()>>>>;
82
83pub struct EntityResolver<'a> {
84    store: &'a GraphStore,
85    embedding_store: Option<&'a Arc<EmbeddingStore>>,
86    provider: Option<&'a AnyProvider>,
87    similarity_threshold: f32,
88    ambiguous_threshold: f32,
89    name_locks: NameLockMap,
90    /// Counter for error-triggered fallbacks (embed/LLM failures). Tests can read this via Arc.
91    fallback_count: Arc<std::sync::atomic::AtomicU64>,
92}
93
94impl<'a> EntityResolver<'a> {
95    #[must_use]
96    pub fn new(store: &'a GraphStore) -> Self {
97        Self {
98            store,
99            embedding_store: None,
100            provider: None,
101            similarity_threshold: 0.85,
102            ambiguous_threshold: 0.70,
103            name_locks: Arc::new(DashMap::new()),
104            fallback_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
105        }
106    }
107
108    #[must_use]
109    pub fn with_embedding_store(mut self, store: &'a Arc<EmbeddingStore>) -> Self {
110        self.embedding_store = Some(store);
111        self
112    }
113
114    #[must_use]
115    pub fn with_provider(mut self, provider: &'a AnyProvider) -> Self {
116        self.provider = Some(provider);
117        self
118    }
119
120    #[must_use]
121    pub fn with_thresholds(mut self, similarity: f32, ambiguous: f32) -> Self {
122        self.similarity_threshold = similarity;
123        self.ambiguous_threshold = ambiguous;
124        self
125    }
126
127    /// Shared fallback counter — tests can clone this Arc to inspect the value.
128    #[must_use]
129    pub fn fallback_count(&self) -> Arc<std::sync::atomic::AtomicU64> {
130        Arc::clone(&self.fallback_count)
131    }
132
133    /// Normalize an entity name: trim, lowercase, strip control chars, truncate.
134    fn normalize_name(name: &str) -> String {
135        let lowered = name.trim().to_lowercase();
136        let cleaned = strip_control_chars(&lowered);
137        let normalized = truncate_to_bytes(&cleaned, MAX_ENTITY_NAME_BYTES).to_owned();
138        if normalized.len() < cleaned.len() {
139            tracing::debug!(
140                "graph resolver: entity name truncated to {} bytes",
141                MAX_ENTITY_NAME_BYTES
142            );
143        }
144        normalized
145    }
146
147    /// Parse an entity type string, falling back to `Concept` on unknown values.
148    fn parse_entity_type(entity_type: &str) -> EntityType {
149        entity_type
150            .trim()
151            .to_lowercase()
152            .parse::<EntityType>()
153            .unwrap_or_else(|_| {
154                tracing::debug!(
155                    "graph resolver: unknown entity type {:?}, falling back to Concept",
156                    entity_type
157                );
158                EntityType::Concept
159            })
160    }
161
162    /// Acquire the per-name lock and return the guard. Keeps lock alive for the caller.
163    async fn lock_name(&self, normalized: &str) -> tokio::sync::OwnedMutexGuard<()> {
164        let lock = self
165            .name_locks
166            .entry(normalized.to_owned())
167            .or_insert_with(|| Arc::new(Mutex::new(())))
168            .clone();
169        lock.lock_owned().await
170    }
171
172    /// Resolve an extracted entity using the alias-first canonicalization pipeline.
173    ///
174    /// Pipeline:
175    /// 1. Normalize: trim, lowercase, strip control chars, truncate to 512 bytes.
176    /// 2. Parse entity type (fallback to Concept on unknown).
177    /// 3. Alias lookup: search `graph_entity_aliases` by normalized name + `entity_type`.
178    ///    If found, touch `last_seen_at` and return the existing entity id.
179    /// 4. Canonical name lookup: search `graph_entities` by `canonical_name` + `entity_type`.
180    ///    If found, touch `last_seen_at` and return the existing entity id.
181    /// 5. When `embedding_store` and `provider` are configured, performs embedding-based fuzzy
182    ///    matching: cosine similarity search (Qdrant), LLM disambiguation for ambiguous range,
183    ///    merge or create based on result. Failures degrade gracefully to step 6.
184    /// 6. Create: upsert new entity with `canonical_name` = normalized name.
185    /// 7. Register the normalized form (and original trimmed form if different) as aliases.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if the entity name is empty after normalization, or if a DB operation fails.
190    pub async fn resolve(
191        &self,
192        name: &str,
193        entity_type: &str,
194        summary: Option<&str>,
195    ) -> Result<(i64, ResolutionOutcome), MemoryError> {
196        let normalized = Self::normalize_name(name);
197
198        if normalized.is_empty() {
199            return Err(MemoryError::GraphStore("empty entity name".into()));
200        }
201
202        let et = Self::parse_entity_type(entity_type);
203
204        // The surface form preserves the original casing for user-facing display.
205        let surface_name = name.trim().to_owned();
206
207        // Acquire per-name lock to prevent concurrent duplicate creation.
208        let _guard = self.lock_name(&normalized).await;
209
210        // Step 3: alias-first lookup (filters by entity_type to prevent cross-type collisions).
211        if let Some(entity) = self.store.find_entity_by_alias(&normalized, et).await? {
212            self.store
213                .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
214                .await?;
215            return Ok((entity.id, ResolutionOutcome::ExactMatch));
216        }
217
218        // Step 4: canonical name lookup.
219        if let Some(entity) = self.store.find_entity(&normalized, et).await? {
220            self.store
221                .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
222                .await?;
223            return Ok((entity.id, ResolutionOutcome::ExactMatch));
224        }
225
226        // Step 5: Embedding-based resolution (when configured).
227        if let Some(outcome) = self
228            .resolve_via_embedding(&normalized, name, &surface_name, et, summary)
229            .await?
230        {
231            return Ok(outcome);
232        }
233
234        // Step 6: Create new entity (no embedding store, or embedding failure).
235        let entity_id = self
236            .store
237            .upsert_entity(&surface_name, &normalized, et, summary)
238            .await?;
239
240        self.register_aliases(entity_id, &normalized, name).await?;
241
242        Ok((entity_id, ResolutionOutcome::Created))
243    }
244
245    /// Compute embedding for an entity, incrementing `fallback_count` on failure/timeout.
246    /// Returns `None` when embedding is unavailable (caller should skip vector operations).
247    async fn embed_entity_text(
248        &self,
249        provider: &AnyProvider,
250        normalized: &str,
251        summary: Option<&str>,
252    ) -> Option<Vec<f32>> {
253        let safe_summary = truncate_to_bytes(summary.unwrap_or(""), MAX_FACT_BYTES);
254        let embed_text = format!("{normalized}: {safe_summary}");
255        let embed_result = tokio::time::timeout(
256            std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
257            provider.embed(&embed_text),
258        )
259        .await;
260        match embed_result {
261            Ok(Ok(v)) => Some(v),
262            Ok(Err(err)) => {
263                self.fallback_count
264                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
265                tracing::warn!(entity_name = %normalized, error = %err,
266                    "embed() failed; falling back to exact-match-only entity creation");
267                None
268            }
269            Err(_timeout) => {
270                self.fallback_count
271                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
272                tracing::warn!(entity_name = %normalized,
273                    "embed() timed out after {}s; falling back to create new entity",
274                    EMBED_TIMEOUT_SECS);
275                None
276            }
277        }
278    }
279
280    /// Handle a candidate in the ambiguous score range by running LLM disambiguation.
281    /// Returns `Ok(Some(...))` if the LLM confirms a match, `Ok(None)` to fall through to create.
282    #[allow(clippy::too_many_arguments)]
283    async fn handle_ambiguous_candidate(
284        &self,
285        emb_store: &EmbeddingStore,
286        provider: &AnyProvider,
287        payload: &std::collections::HashMap<String, serde_json::Value>,
288        score: f32,
289        surface_name: &str,
290        normalized: &str,
291        et: EntityType,
292        summary: Option<&str>,
293    ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
294        let entity_id = payload
295            .get("entity_id")
296            .and_then(serde_json::Value::as_i64)
297            .ok_or_else(|| MemoryError::GraphStore("missing entity_id in payload".into()))?;
298        let existing_name = payload
299            .get("name")
300            .and_then(|v| v.as_str())
301            .unwrap_or("")
302            .to_owned();
303        let existing_summary = payload
304            .get("summary")
305            .and_then(|v| v.as_str())
306            .unwrap_or("")
307            .to_owned();
308        // Use the existing entity's actual type from the payload (IC-S3)
309        let existing_type = payload
310            .get("entity_type")
311            .and_then(|v| v.as_str())
312            .unwrap_or(et.as_str())
313            .to_owned();
314        match self
315            .llm_disambiguate(
316                provider,
317                normalized,
318                et.as_str(),
319                summary.unwrap_or(""),
320                &existing_name,
321                &existing_type,
322                &existing_summary,
323                score,
324            )
325            .await
326        {
327            Some(true) => {
328                self.merge_entity(
329                    emb_store,
330                    provider,
331                    entity_id,
332                    surface_name,
333                    normalized,
334                    et,
335                    summary,
336                )
337                .await?;
338                Ok(Some((entity_id, ResolutionOutcome::LlmDisambiguated)))
339            }
340            Some(false) => Ok(None),
341            None => {
342                self.fallback_count
343                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
344                tracing::warn!(entity_name = %normalized,
345                    "LLM disambiguation failed; falling back to create new entity");
346                Ok(None)
347            }
348        }
349    }
350
351    /// Attempt embedding-based resolution. Returns `Ok(Some(...))` if resolved (early return),
352    /// `Ok(None)` if no match found (caller should fall through to create), or `Err` on DB error.
353    async fn resolve_via_embedding(
354        &self,
355        normalized: &str,
356        original_name: &str,
357        surface_name: &str,
358        et: EntityType,
359        summary: Option<&str>,
360    ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
361        let (Some(emb_store), Some(provider)) = (self.embedding_store, self.provider) else {
362            return Ok(None);
363        };
364
365        let Some(query_vec) = self.embed_entity_text(provider, normalized, summary).await else {
366            return Ok(None);
367        };
368
369        let type_filter = VectorFilter {
370            must: vec![FieldCondition {
371                field: "entity_type".into(),
372                value: FieldValue::Text(et.as_str().to_owned()),
373            }],
374            must_not: vec![],
375        };
376        let candidates = match emb_store
377            .search_collection(ENTITY_COLLECTION, &query_vec, 5, Some(type_filter))
378            .await
379        {
380            Ok(c) => c,
381            Err(err) => {
382                self.fallback_count
383                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
384                tracing::warn!(entity_name = %normalized, error = %err,
385                    "Qdrant search failed; falling back to create new entity");
386                return self
387                    .create_with_embedding(
388                        emb_store,
389                        surface_name,
390                        normalized,
391                        original_name,
392                        et,
393                        summary,
394                        &query_vec,
395                    )
396                    .await
397                    .map(Some);
398            }
399        };
400
401        if let Some(best) = candidates.first() {
402            let score = best.score;
403            if score >= self.similarity_threshold {
404                let entity_id = best
405                    .payload
406                    .get("entity_id")
407                    .and_then(serde_json::Value::as_i64)
408                    .ok_or_else(|| {
409                        MemoryError::GraphStore("missing entity_id in payload".into())
410                    })?;
411                self.merge_entity(
412                    emb_store,
413                    provider,
414                    entity_id,
415                    surface_name,
416                    normalized,
417                    et,
418                    summary,
419                )
420                .await?;
421                return Ok(Some((
422                    entity_id,
423                    ResolutionOutcome::EmbeddingMatch { score },
424                )));
425            } else if score >= self.ambiguous_threshold
426                && let Some(result) = self
427                    .handle_ambiguous_candidate(
428                        emb_store,
429                        provider,
430                        &best.payload,
431                        score,
432                        surface_name,
433                        normalized,
434                        et,
435                        summary,
436                    )
437                    .await?
438            {
439                return Ok(Some(result));
440            }
441            // score < ambiguous_threshold or LLM said different: fall through to create with embedding
442        }
443
444        // No suitable match — create new entity and store embedding.
445        self.create_with_embedding(
446            emb_store,
447            surface_name,
448            normalized,
449            original_name,
450            et,
451            summary,
452            &query_vec,
453        )
454        .await
455        .map(Some)
456    }
457
458    /// Create a new entity, register aliases, and store its embedding in Qdrant.
459    #[allow(clippy::too_many_arguments)]
460    async fn create_with_embedding(
461        &self,
462        emb_store: &EmbeddingStore,
463        surface_name: &str,
464        normalized: &str,
465        original_name: &str,
466        et: EntityType,
467        summary: Option<&str>,
468        query_vec: &[f32],
469    ) -> Result<(i64, ResolutionOutcome), MemoryError> {
470        let entity_id = self
471            .store
472            .upsert_entity(surface_name, normalized, et, summary)
473            .await?;
474        self.register_aliases(entity_id, normalized, original_name)
475            .await?;
476        self.store_entity_embedding(
477            emb_store,
478            entity_id,
479            None,
480            normalized,
481            et,
482            summary.unwrap_or(""),
483            query_vec,
484        )
485        .await;
486        Ok((entity_id, ResolutionOutcome::Created))
487    }
488
489    /// Register the normalized form and original trimmed form as aliases for an entity.
490    async fn register_aliases(
491        &self,
492        entity_id: i64,
493        normalized: &str,
494        original_name: &str,
495    ) -> Result<(), MemoryError> {
496        self.store.add_alias(entity_id, normalized).await?;
497
498        // Also register the original trimmed lowercased form if it differs from normalized
499        // (e.g. when control chars were stripped, leaving a shorter string).
500        let original_trimmed = original_name.trim().to_lowercase();
501        let original_clean_str = strip_control_chars(&original_trimmed);
502        let original_clean = truncate_to_bytes(&original_clean_str, MAX_ENTITY_NAME_BYTES);
503        if original_clean != normalized {
504            self.store.add_alias(entity_id, original_clean).await?;
505        }
506
507        Ok(())
508    }
509
510    /// Merge an existing entity with new information: combine summaries, update Qdrant.
511    #[allow(clippy::too_many_arguments)]
512    async fn merge_entity(
513        &self,
514        emb_store: &EmbeddingStore,
515        provider: &AnyProvider,
516        entity_id: i64,
517        new_surface_name: &str,
518        new_canonical_name: &str,
519        entity_type: EntityType,
520        new_summary: Option<&str>,
521    ) -> Result<(), MemoryError> {
522        // TODO(PERF-03): The Qdrant payload already contains name/summary at the call site;
523        // pass them in as parameters to eliminate this extra SQLite roundtrip per merge.
524        let existing = self.store.find_entity_by_id(entity_id).await?;
525        let existing_summary = existing
526            .as_ref()
527            .and_then(|e| e.summary.as_deref())
528            .unwrap_or("");
529
530        let merged_summary = if let Some(new) = new_summary {
531            if !new.is_empty() && !existing_summary.is_empty() {
532                let combined = format!("{existing_summary}; {new}");
533                // TODO(S2): use LLM-based summary merge when summary exceeds 512 bytes
534                truncate_to_bytes(&combined, MAX_FACT_BYTES).to_owned()
535            } else if !new.is_empty() {
536                new.to_owned()
537            } else {
538                existing_summary.to_owned()
539            }
540        } else {
541            existing_summary.to_owned()
542        };
543
544        let summary_opt = if merged_summary.is_empty() {
545            None
546        } else {
547            Some(merged_summary.as_str())
548        };
549
550        // Update the EXISTING entity's summary (keep its canonical_name, update surface display name).
551        let existing_canonical = existing.as_ref().map_or_else(
552            || new_canonical_name.to_owned(),
553            |e| e.canonical_name.clone(),
554        );
555        let existing_name_owned = existing
556            .as_ref()
557            .map_or_else(|| new_surface_name.to_owned(), |e| e.name.clone());
558        self.store
559            .upsert_entity(
560                &existing_name_owned,
561                &existing_canonical,
562                entity_type,
563                summary_opt,
564            )
565            .await?;
566
567        // Retrieve existing qdrant_point_id to reuse it (avoids orphaned stale points, IC-S1)
568        let existing_point_id = existing
569            .as_ref()
570            .and_then(|e| e.qdrant_point_id.as_deref())
571            .map(ToOwned::to_owned);
572
573        // Re-embed merged text and upsert to Qdrant
574        let embed_text = format!("{existing_name_owned}: {merged_summary}");
575        let embed_result = tokio::time::timeout(
576            std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
577            provider.embed(&embed_text),
578        )
579        .await;
580
581        match embed_result {
582            Ok(Ok(vec)) => {
583                self.store_entity_embedding(
584                    emb_store,
585                    entity_id,
586                    existing_point_id.as_deref(),
587                    &existing_name_owned,
588                    entity_type,
589                    &merged_summary,
590                    &vec,
591                )
592                .await;
593            }
594            Ok(Err(err)) => {
595                tracing::warn!(
596                    entity_id,
597                    error = %err,
598                    "merge re-embed failed; Qdrant entry may be stale"
599                );
600            }
601            Err(_) => {
602                tracing::warn!(
603                    entity_id,
604                    "merge re-embed timed out; Qdrant entry may be stale"
605                );
606            }
607        }
608
609        Ok(())
610    }
611
612    /// Store an entity embedding in Qdrant and update `qdrant_point_id` in `SQLite`.
613    ///
614    /// When `existing_point_id` is `Some`, the existing Qdrant point is updated in-place
615    /// (upsert by ID) to avoid orphaned stale points. When `None`, a new point is created.
616    ///
617    /// Failures are logged at warn level but do not propagate — the entity is still
618    /// valid in `SQLite` even if Qdrant upsert fails.
619    #[allow(clippy::too_many_arguments)]
620    async fn store_entity_embedding(
621        &self,
622        emb_store: &EmbeddingStore,
623        entity_id: i64,
624        existing_point_id: Option<&str>,
625        name: &str,
626        entity_type: EntityType,
627        summary: &str,
628        vector: &[f32],
629    ) {
630        // TODO(PERF-05): ensure_named_collection() is called on every store_entity_embedding()
631        // invocation, generating one Qdrant network roundtrip per entity in a batch. Cache this
632        // result at resolver construction time via `std::sync::OnceLock<bool>` to call it once.
633        let vector_size = u64::try_from(vector.len()).unwrap_or(384);
634        if let Err(err) = emb_store
635            .ensure_named_collection(ENTITY_COLLECTION, vector_size)
636            .await
637        {
638            tracing::error!(
639                error = %err,
640                "failed to ensure entity embedding collection; skipping Qdrant upsert"
641            );
642            return;
643        }
644
645        let payload = serde_json::json!({
646            "entity_id": entity_id,
647            "name": name,
648            "entity_type": entity_type.as_str(),
649            "summary": summary,
650        });
651
652        if let Some(point_id) = existing_point_id {
653            // Reuse existing point to avoid orphaned stale points (IC-S1)
654            if let Err(err) = emb_store
655                .upsert_to_collection(ENTITY_COLLECTION, point_id, payload, vector.to_vec())
656                .await
657            {
658                tracing::warn!(
659                    entity_id,
660                    error = %err,
661                    "Qdrant upsert (existing point) failed; Qdrant entry may be stale"
662                );
663            }
664        } else {
665            match emb_store
666                .store_to_collection(ENTITY_COLLECTION, payload, vector.to_vec())
667                .await
668            {
669                Ok(point_id) => {
670                    if let Err(err) = self
671                        .store
672                        .set_entity_qdrant_point_id(entity_id, &point_id)
673                        .await
674                    {
675                        tracing::warn!(
676                            entity_id,
677                            error = %err,
678                            "failed to store qdrant_point_id in SQLite"
679                        );
680                    }
681                }
682                Err(err) => {
683                    tracing::warn!(
684                        entity_id,
685                        error = %err,
686                        "Qdrant upsert failed; entity created in SQLite, qdrant_point_id remains NULL"
687                    );
688                }
689            }
690        }
691    }
692
693    /// Ask the LLM whether two entities are the same.
694    ///
695    /// Returns `Some(true)` for merge, `Some(false)` for separate, `None` on failure.
696    #[allow(clippy::too_many_arguments)]
697    async fn llm_disambiguate(
698        &self,
699        provider: &AnyProvider,
700        new_name: &str,
701        new_type: &str,
702        new_summary: &str,
703        existing_name: &str,
704        existing_type: &str,
705        existing_summary: &str,
706        score: f32,
707    ) -> Option<bool> {
708        let prompt = format!(
709            "New entity:\n\
710             - Name: <external-data>{new_name}</external-data>\n\
711             - Type: <external-data>{new_type}</external-data>\n\
712             - Summary: <external-data>{new_summary}</external-data>\n\
713             \n\
714             Existing entity:\n\
715             - Name: <external-data>{existing_name}</external-data>\n\
716             - Type: <external-data>{existing_type}</external-data>\n\
717             - Summary: <external-data>{existing_summary}</external-data>\n\
718             \n\
719             Cosine similarity: {score:.2}\n\
720             \n\
721             Are these the same entity? Respond with JSON: {{\"same_entity\": true}} or {{\"same_entity\": false}}"
722        );
723
724        let messages = [
725            Message::from_legacy(
726                Role::System,
727                "You are an entity disambiguation assistant. Given a new entity mention and \
728                 an existing entity from the knowledge graph, determine if they refer to the same \
729                 real-world entity. Respond only with JSON.",
730            ),
731            Message::from_legacy(Role::User, prompt),
732        ];
733
734        let response = match provider.chat(&messages).await {
735            Ok(r) => r,
736            Err(err) => {
737                tracing::warn!(error = %err, "LLM disambiguation chat failed");
738                return None;
739            }
740        };
741
742        // Parse JSON response, tolerating markdown code fences
743        let json_str = extract_json(&response);
744        match serde_json::from_str::<DisambiguationResponse>(json_str) {
745            Ok(parsed) => Some(parsed.same_entity),
746            Err(err) => {
747                tracing::warn!(error = %err, response = %response, "failed to parse LLM disambiguation response");
748                None
749            }
750        }
751    }
752
753    /// Resolve a batch of extracted entities concurrently.
754    ///
755    /// Returns a `Vec` of `(entity_id, ResolutionOutcome)` in the same order as input.
756    ///
757    /// # Errors
758    ///
759    /// Returns an error if any DB operation fails.
760    ///
761    /// # Panics
762    ///
763    /// Panics if an internal stream collection bug causes a result index to be missing.
764    /// This indicates a programming error and should never occur in correct usage.
765    pub async fn resolve_batch(
766        &self,
767        entities: &[ExtractedEntity],
768    ) -> Result<Vec<(i64, ResolutionOutcome)>, MemoryError> {
769        if entities.is_empty() {
770            return Ok(Vec::new());
771        }
772
773        // Process up to 4 embed+resolve operations concurrently (IC-S2/PERF-01).
774        let mut results: Vec<Option<(i64, ResolutionOutcome)>> = vec![None; entities.len()];
775
776        let mut stream = stream::iter(entities.iter().enumerate().map(|(i, e)| {
777            let name = e.name.clone();
778            let entity_type = e.entity_type.clone();
779            let summary = e.summary.clone();
780            async move {
781                let result = self.resolve(&name, &entity_type, summary.as_deref()).await;
782                (i, result)
783            }
784        }))
785        .buffer_unordered(4);
786
787        while let Some((i, result)) = stream.next().await {
788            match result {
789                Ok(outcome) => results[i] = Some(outcome),
790                Err(err) => return Err(err),
791            }
792        }
793
794        Ok(results
795            .into_iter()
796            .enumerate()
797            .map(|(i, r)| {
798                r.unwrap_or_else(|| {
799                    tracing::warn!(
800                        index = i,
801                        "resolve_batch: missing result at index — bug in stream collection"
802                    );
803                    panic!("resolve_batch: missing result at index {i}")
804                })
805            })
806            .collect())
807    }
808
809    /// Resolve an extracted edge: deduplicate or supersede existing edges.
810    ///
811    /// - If an active edge with the same direction and relation exists with an identical fact,
812    ///   returns `None` (deduplicated).
813    /// - If an active edge with the same direction and relation exists with a different fact,
814    ///   invalidates the old edge and inserts the new one, returning `Some(new_id)`.
815    /// - If no matching edge exists, inserts a new edge and returns `Some(new_id)`.
816    ///
817    /// Relation and fact strings are sanitized (control chars stripped, length-capped).
818    ///
819    /// # Errors
820    ///
821    /// Returns an error if any database operation fails.
822    pub async fn resolve_edge(
823        &self,
824        source_id: i64,
825        target_id: i64,
826        relation: &str,
827        fact: &str,
828        confidence: f32,
829        episode_id: Option<MessageId>,
830    ) -> Result<Option<i64>, MemoryError> {
831        let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
832        let normalized_relation = truncate_to_bytes(&relation_clean, MAX_RELATION_BYTES).to_owned();
833
834        let fact_clean = strip_control_chars(fact.trim());
835        let normalized_fact = truncate_to_bytes(&fact_clean, MAX_FACT_BYTES).to_owned();
836
837        // Fetch only exact-direction edges — no reverse edges to filter out
838        let existing_edges = self.store.edges_exact(source_id, target_id).await?;
839
840        let matching = existing_edges
841            .iter()
842            .find(|e| e.relation == normalized_relation);
843
844        if let Some(old) = matching {
845            if old.fact == normalized_fact {
846                // Exact duplicate — skip
847                return Ok(None);
848            }
849            // Same relation, different fact — supersede
850            self.store.invalidate_edge(old.id).await?;
851        }
852
853        let new_id = self
854            .store
855            .insert_edge(
856                source_id,
857                target_id,
858                &normalized_relation,
859                &normalized_fact,
860                confidence,
861                episode_id,
862            )
863            .await?;
864        Ok(Some(new_id))
865    }
866}
867
868/// Extract a JSON object from a string that may contain markdown code fences.
869fn extract_json(s: &str) -> &str {
870    let trimmed = s.trim();
871    // Strip ```json ... ``` or ``` ... ```
872    if let Some(inner) = trimmed.strip_prefix("```json")
873        && let Some(end) = inner.rfind("```")
874    {
875        return inner[..end].trim();
876    }
877    if let Some(inner) = trimmed.strip_prefix("```")
878        && let Some(end) = inner.rfind("```")
879    {
880        return inner[..end].trim();
881    }
882    // Find first '{' to last '}'
883    if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
884        && start <= end
885    {
886        return &trimmed[start..=end];
887    }
888    trimmed
889}
890
891#[cfg(test)]
892mod tests {
893    use std::sync::Arc;
894
895    use super::*;
896    use crate::in_memory_store::InMemoryVectorStore;
897    use crate::sqlite::SqliteStore;
898
899    async fn setup() -> GraphStore {
900        let store = SqliteStore::new(":memory:").await.unwrap();
901        GraphStore::new(store.pool().clone())
902    }
903
904    async fn setup_with_embedding() -> (GraphStore, Arc<EmbeddingStore>) {
905        let sqlite = SqliteStore::new(":memory:").await.unwrap();
906        let pool = sqlite.pool().clone();
907        let mem_store = Box::new(InMemoryVectorStore::new());
908        let emb = Arc::new(EmbeddingStore::with_store(mem_store, pool));
909        let gs = GraphStore::new(sqlite.pool().clone());
910        (gs, emb)
911    }
912
913    fn make_mock_provider_with_embedding(embedding: Vec<f32>) -> zeph_llm::mock::MockProvider {
914        let mut p = zeph_llm::mock::MockProvider::default();
915        p.embedding = embedding;
916        p.supports_embeddings = true;
917        p
918    }
919
920    // ── Existing tests (resolve() with no embedding store — exact match only) ──
921
922    #[tokio::test]
923    async fn resolve_creates_new_entity() {
924        let gs = setup().await;
925        let resolver = EntityResolver::new(&gs);
926        let (id, outcome) = resolver
927            .resolve("alice", "person", Some("a person"))
928            .await
929            .unwrap();
930        assert!(id > 0);
931        assert_eq!(outcome, ResolutionOutcome::Created);
932    }
933
934    #[tokio::test]
935    async fn resolve_updates_existing_entity() {
936        let gs = setup().await;
937        let resolver = EntityResolver::new(&gs);
938        let (id1, _) = resolver.resolve("alice", "person", None).await.unwrap();
939        let (id2, outcome) = resolver
940            .resolve("alice", "person", Some("updated summary"))
941            .await
942            .unwrap();
943        assert_eq!(id1, id2);
944        assert_eq!(outcome, ResolutionOutcome::ExactMatch);
945
946        let entity = gs
947            .find_entity("alice", EntityType::Person)
948            .await
949            .unwrap()
950            .unwrap();
951        assert_eq!(entity.summary.as_deref(), Some("updated summary"));
952    }
953
954    #[tokio::test]
955    async fn resolve_unknown_type_falls_back_to_concept() {
956        let gs = setup().await;
957        let resolver = EntityResolver::new(&gs);
958        let (id, _) = resolver
959            .resolve("my_thing", "unknown_type", None)
960            .await
961            .unwrap();
962        assert!(id > 0);
963
964        // Verify it was stored as Concept
965        let entity = gs
966            .find_entity("my_thing", EntityType::Concept)
967            .await
968            .unwrap();
969        assert!(entity.is_some());
970    }
971
972    #[tokio::test]
973    async fn resolve_empty_name_returns_error() {
974        let gs = setup().await;
975        let resolver = EntityResolver::new(&gs);
976
977        let result_empty = resolver.resolve("", "concept", None).await;
978        assert!(result_empty.is_err());
979        assert!(matches!(
980            result_empty.unwrap_err(),
981            MemoryError::GraphStore(_)
982        ));
983
984        let result_whitespace = resolver.resolve("   ", "concept", None).await;
985        assert!(result_whitespace.is_err());
986    }
987
988    #[tokio::test]
989    async fn resolve_case_insensitive() {
990        let gs = setup().await;
991        let resolver = EntityResolver::new(&gs);
992
993        let (id1, _) = resolver.resolve("Rust", "language", None).await.unwrap();
994        let (id2, outcome) = resolver.resolve("rust", "language", None).await.unwrap();
995        assert_eq!(
996            id1, id2,
997            "'Rust' and 'rust' should resolve to the same entity"
998        );
999        assert_eq!(outcome, ResolutionOutcome::ExactMatch);
1000    }
1001
1002    #[tokio::test]
1003    async fn resolve_edge_inserts_new() {
1004        let gs = setup().await;
1005        let resolver = EntityResolver::new(&gs);
1006
1007        let src = gs
1008            .upsert_entity("src", "src", EntityType::Concept, None)
1009            .await
1010            .unwrap();
1011        let tgt = gs
1012            .upsert_entity("tgt", "tgt", EntityType::Concept, None)
1013            .await
1014            .unwrap();
1015
1016        let result = resolver
1017            .resolve_edge(src, tgt, "uses", "src uses tgt", 0.9, None)
1018            .await
1019            .unwrap();
1020        assert!(result.is_some());
1021        assert!(result.unwrap() > 0);
1022    }
1023
1024    #[tokio::test]
1025    async fn resolve_edge_deduplicates_identical() {
1026        let gs = setup().await;
1027        let resolver = EntityResolver::new(&gs);
1028
1029        let src = gs
1030            .upsert_entity("a", "a", EntityType::Concept, None)
1031            .await
1032            .unwrap();
1033        let tgt = gs
1034            .upsert_entity("b", "b", EntityType::Concept, None)
1035            .await
1036            .unwrap();
1037
1038        let first = resolver
1039            .resolve_edge(src, tgt, "uses", "a uses b", 0.9, None)
1040            .await
1041            .unwrap();
1042        assert!(first.is_some());
1043
1044        let second = resolver
1045            .resolve_edge(src, tgt, "uses", "a uses b", 0.9, None)
1046            .await
1047            .unwrap();
1048        assert!(second.is_none(), "identical edge should be deduplicated");
1049    }
1050
1051    #[tokio::test]
1052    async fn resolve_edge_supersedes_contradictory() {
1053        let gs = setup().await;
1054        let resolver = EntityResolver::new(&gs);
1055
1056        let src = gs
1057            .upsert_entity("x", "x", EntityType::Concept, None)
1058            .await
1059            .unwrap();
1060        let tgt = gs
1061            .upsert_entity("y", "y", EntityType::Concept, None)
1062            .await
1063            .unwrap();
1064
1065        let first_id = resolver
1066            .resolve_edge(src, tgt, "prefers", "x prefers y (old)", 0.8, None)
1067            .await
1068            .unwrap()
1069            .unwrap();
1070
1071        let second_id = resolver
1072            .resolve_edge(src, tgt, "prefers", "x prefers y (new)", 0.9, None)
1073            .await
1074            .unwrap()
1075            .unwrap();
1076
1077        assert_ne!(first_id, second_id, "superseded edge should have a new ID");
1078
1079        // Old edge should be invalidated
1080        let active_count = gs.active_edge_count().await.unwrap();
1081        assert_eq!(active_count, 1, "only new edge should be active");
1082    }
1083
1084    #[tokio::test]
1085    async fn resolve_edge_direction_sensitive() {
1086        // A->B "uses" should not interfere with B->A "uses" dedup
1087        let gs = setup().await;
1088        let resolver = EntityResolver::new(&gs);
1089
1090        let a = gs
1091            .upsert_entity("node_a", "node_a", EntityType::Concept, None)
1092            .await
1093            .unwrap();
1094        let b = gs
1095            .upsert_entity("node_b", "node_b", EntityType::Concept, None)
1096            .await
1097            .unwrap();
1098
1099        // Insert A->B
1100        let id1 = resolver
1101            .resolve_edge(a, b, "uses", "A uses B", 0.9, None)
1102            .await
1103            .unwrap();
1104        assert!(id1.is_some());
1105
1106        // Insert B->A with different fact — should NOT invalidate A->B (different direction)
1107        let id2 = resolver
1108            .resolve_edge(b, a, "uses", "B uses A (different direction)", 0.9, None)
1109            .await
1110            .unwrap();
1111        assert!(id2.is_some());
1112
1113        // Both edges should still be active
1114        let active_count = gs.active_edge_count().await.unwrap();
1115        assert_eq!(active_count, 2, "both directional edges should be active");
1116    }
1117
1118    #[tokio::test]
1119    async fn resolve_edge_normalizes_relation_case() {
1120        let gs = setup().await;
1121        let resolver = EntityResolver::new(&gs);
1122
1123        let src = gs
1124            .upsert_entity("p", "p", EntityType::Concept, None)
1125            .await
1126            .unwrap();
1127        let tgt = gs
1128            .upsert_entity("q", "q", EntityType::Concept, None)
1129            .await
1130            .unwrap();
1131
1132        // Insert with uppercase relation
1133        let id1 = resolver
1134            .resolve_edge(src, tgt, "Uses", "p uses q", 0.9, None)
1135            .await
1136            .unwrap();
1137        assert!(id1.is_some());
1138
1139        // Insert with lowercase relation — same normalized relation, same fact → deduplicate
1140        let id2 = resolver
1141            .resolve_edge(src, tgt, "uses", "p uses q", 0.9, None)
1142            .await
1143            .unwrap();
1144        assert!(id2.is_none(), "normalized relations should deduplicate");
1145    }
1146
1147    // ── IC-01: entity_type lowercased before parse ────────────────────────────
1148
1149    #[tokio::test]
1150    async fn resolve_entity_type_uppercase_parsed_correctly() {
1151        let gs = setup().await;
1152        let resolver = EntityResolver::new(&gs);
1153
1154        // "Person" (title case from LLM) should parse as EntityType::Person, not fall back to Concept
1155        let (id, _) = resolver
1156            .resolve("test_entity", "Person", None)
1157            .await
1158            .unwrap();
1159        assert!(id > 0);
1160
1161        let entity = gs
1162            .find_entity("test_entity", EntityType::Person)
1163            .await
1164            .unwrap();
1165        assert!(entity.is_some(), "entity should be stored as Person type");
1166    }
1167
1168    #[tokio::test]
1169    async fn resolve_entity_type_all_caps_parsed_correctly() {
1170        let gs = setup().await;
1171        let resolver = EntityResolver::new(&gs);
1172
1173        let (id, _) = resolver.resolve("my_lang", "LANGUAGE", None).await.unwrap();
1174        assert!(id > 0);
1175
1176        let entity = gs
1177            .find_entity("my_lang", EntityType::Language)
1178            .await
1179            .unwrap();
1180        assert!(entity.is_some(), "entity should be stored as Language type");
1181    }
1182
1183    // ── SEC-GRAPH-01: entity name length cap ──────────────────────────────────
1184
1185    #[tokio::test]
1186    async fn resolve_truncates_long_entity_name() {
1187        let gs = setup().await;
1188        let resolver = EntityResolver::new(&gs);
1189
1190        let long_name = "a".repeat(1024);
1191        let (id, _) = resolver.resolve(&long_name, "concept", None).await.unwrap();
1192        assert!(id > 0);
1193
1194        // Entity should exist with a truncated name (512 bytes)
1195        let entity = gs
1196            .find_entity(&"a".repeat(512), EntityType::Concept)
1197            .await
1198            .unwrap();
1199        assert!(entity.is_some(), "truncated name should be stored");
1200    }
1201
1202    // ── SEC-GRAPH-02: control character stripping ─────────────────────────────
1203
1204    #[tokio::test]
1205    async fn resolve_strips_control_chars_from_name() {
1206        let gs = setup().await;
1207        let resolver = EntityResolver::new(&gs);
1208
1209        // Name with null byte and a BiDi override
1210        let name_with_ctrl = "rust\x00lang";
1211        let (id, _) = resolver
1212            .resolve(name_with_ctrl, "language", None)
1213            .await
1214            .unwrap();
1215        assert!(id > 0);
1216
1217        // Stored name should have control chars removed
1218        let entity = gs
1219            .find_entity("rustlang", EntityType::Language)
1220            .await
1221            .unwrap();
1222        assert!(
1223            entity.is_some(),
1224            "control chars should be stripped from stored name"
1225        );
1226    }
1227
1228    #[tokio::test]
1229    async fn resolve_strips_bidi_overrides_from_name() {
1230        let gs = setup().await;
1231        let resolver = EntityResolver::new(&gs);
1232
1233        // U+202E is RIGHT-TO-LEFT OVERRIDE — a BiDi spoof character
1234        let name_with_bidi = "rust\u{202E}lang";
1235        let (id, _) = resolver
1236            .resolve(name_with_bidi, "language", None)
1237            .await
1238            .unwrap();
1239        assert!(id > 0);
1240
1241        let entity = gs
1242            .find_entity("rustlang", EntityType::Language)
1243            .await
1244            .unwrap();
1245        assert!(entity.is_some(), "BiDi override chars should be stripped");
1246    }
1247
1248    // ── Helper unit tests for sanitization functions ──────────────────────────
1249
1250    #[test]
1251    fn strip_control_chars_removes_ascii_controls() {
1252        assert_eq!(strip_control_chars("hello\x00world"), "helloworld");
1253        assert_eq!(strip_control_chars("tab\there"), "tabhere");
1254        assert_eq!(strip_control_chars("new\nline"), "newline");
1255    }
1256
1257    #[test]
1258    fn strip_control_chars_removes_bidi() {
1259        let bidi = "\u{202E}spoof";
1260        assert_eq!(strip_control_chars(bidi), "spoof");
1261    }
1262
1263    #[test]
1264    fn strip_control_chars_preserves_normal_unicode() {
1265        assert_eq!(strip_control_chars("привет мир"), "привет мир");
1266        assert_eq!(strip_control_chars("日本語"), "日本語");
1267    }
1268
1269    #[test]
1270    fn truncate_to_bytes_exact_boundary() {
1271        let s = "hello";
1272        assert_eq!(truncate_to_bytes(s, 5), "hello");
1273        assert_eq!(truncate_to_bytes(s, 3), "hel");
1274    }
1275
1276    #[test]
1277    fn truncate_to_bytes_respects_utf8_boundary() {
1278        // "é" is 2 bytes in UTF-8 — truncating at 1 byte should give ""
1279        let s = "élan";
1280        let truncated = truncate_to_bytes(s, 1);
1281        assert!(s.is_char_boundary(truncated.len()));
1282    }
1283
1284    // ── New tests: embedding-based resolution ─────────────────────────────────
1285
1286    #[tokio::test]
1287    async fn resolve_with_embedding_store_score_above_threshold_merges() {
1288        let (gs, emb) = setup_with_embedding().await;
1289        // Pre-insert an existing entity (different name to avoid exact match).
1290        // "python programming lang" is in Qdrant; we resolve "python scripting lang"
1291        // which embeds to the identical vector → cosine similarity = 1.0 > 0.85 → merge.
1292        let existing_id = gs
1293            .upsert_entity(
1294                "python programming lang",
1295                "python programming lang",
1296                EntityType::Language,
1297                Some("a programming language"),
1298            )
1299            .await
1300            .unwrap();
1301
1302        let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1303        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1304            .await
1305            .unwrap();
1306        let payload = serde_json::json!({
1307            "entity_id": existing_id,
1308            "name": "python programming lang",
1309            "entity_type": "language",
1310            "summary": "a programming language",
1311        });
1312        let point_id = emb
1313            .store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1314            .await
1315            .unwrap();
1316        gs.set_entity_qdrant_point_id(existing_id, &point_id)
1317            .await
1318            .unwrap();
1319
1320        // Mock provider returns the same vector for any text → cosine similarity = 1.0 > 0.85
1321        let provider = make_mock_provider_with_embedding(mock_vec);
1322        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1323
1324        let resolver = EntityResolver::new(&gs)
1325            .with_embedding_store(&emb)
1326            .with_provider(&any_provider)
1327            .with_thresholds(0.85, 0.70);
1328
1329        // Resolve a different name — no exact match, embedding match wins
1330        let (id, outcome) = resolver
1331            .resolve(
1332                "python scripting lang",
1333                "language",
1334                Some("scripting language"),
1335            )
1336            .await
1337            .unwrap();
1338
1339        assert_eq!(id, existing_id, "should return existing entity ID on merge");
1340        assert!(
1341            matches!(outcome, ResolutionOutcome::EmbeddingMatch { score } if score > 0.85),
1342            "outcome should be EmbeddingMatch with score > 0.85, got {outcome:?}"
1343        );
1344    }
1345
1346    #[tokio::test]
1347    async fn resolve_with_embedding_store_score_below_ambiguous_creates_new() {
1348        let (gs, emb) = setup_with_embedding().await;
1349        // Insert existing entity with orthogonal vector
1350        let existing_id = gs
1351            .upsert_entity("java", "java", EntityType::Language, Some("java language"))
1352            .await
1353            .unwrap();
1354
1355        // Existing uses [1,0,0,0]; new entity will embed to [0,1,0,0] (orthogonal, score=0)
1356        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1357            .await
1358            .unwrap();
1359        let payload = serde_json::json!({
1360            "entity_id": existing_id,
1361            "name": "java",
1362            "entity_type": "language",
1363            "summary": "java language",
1364        });
1365        emb.store_to_collection(ENTITY_COLLECTION, payload, vec![1.0, 0.0, 0.0, 0.0])
1366            .await
1367            .unwrap();
1368
1369        // Mock returns orthogonal vector → score = 0.0 < 0.70
1370        let provider = make_mock_provider_with_embedding(vec![0.0, 1.0, 0.0, 0.0]);
1371        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1372
1373        let resolver = EntityResolver::new(&gs)
1374            .with_embedding_store(&emb)
1375            .with_provider(&any_provider)
1376            .with_thresholds(0.85, 0.70);
1377
1378        let (id, outcome) = resolver
1379            .resolve("kotlin", "language", Some("kotlin language"))
1380            .await
1381            .unwrap();
1382
1383        assert_ne!(id, existing_id, "orthogonal entity should create new");
1384        assert_eq!(outcome, ResolutionOutcome::Created);
1385    }
1386
1387    #[tokio::test]
1388    async fn resolve_with_embedding_failure_falls_back_to_create() {
1389        // Use a mock with supports_embeddings=false — embed() returns EmbedUnsupported error,
1390        // which triggers the fallback path (create new entity).
1391        let sqlite2 = SqliteStore::new(":memory:").await.unwrap();
1392        let pool2 = sqlite2.pool().clone();
1393        let mem2 = Box::new(InMemoryVectorStore::new());
1394        let emb2 = Arc::new(EmbeddingStore::with_store(mem2, pool2));
1395        let gs2 = GraphStore::new(sqlite2.pool().clone());
1396
1397        let mut mock = zeph_llm::mock::MockProvider::default();
1398        mock.supports_embeddings = false;
1399        let any_provider = zeph_llm::any::AnyProvider::Mock(mock);
1400
1401        let resolver = EntityResolver::new(&gs2)
1402            .with_embedding_store(&emb2)
1403            .with_provider(&any_provider);
1404
1405        let (id, outcome) = resolver
1406            .resolve("testentity", "concept", Some("summary"))
1407            .await
1408            .unwrap();
1409        assert!(id > 0);
1410        assert_eq!(outcome, ResolutionOutcome::Created);
1411    }
1412
1413    #[tokio::test]
1414    async fn resolve_fallback_increments_counter() {
1415        let (gs, emb) = setup_with_embedding().await;
1416
1417        // Provider with embed that fails (supports_embeddings=false → EmbedUnsupported error)
1418        let mut mock = zeph_llm::mock::MockProvider::default();
1419        mock.supports_embeddings = false;
1420        let any_provider = zeph_llm::any::AnyProvider::Mock(mock);
1421
1422        let resolver = EntityResolver::new(&gs)
1423            .with_embedding_store(&emb)
1424            .with_provider(&any_provider);
1425
1426        let fallback_count = resolver.fallback_count();
1427
1428        // First call: embed fails → fallback
1429        resolver.resolve("entity_a", "concept", None).await.unwrap();
1430
1431        assert_eq!(
1432            fallback_count.load(std::sync::atomic::Ordering::Relaxed),
1433            1,
1434            "fallback counter should be 1 after embed failure"
1435        );
1436    }
1437
1438    #[tokio::test]
1439    async fn resolve_batch_processes_multiple_entities() {
1440        let gs = setup().await;
1441        let resolver = EntityResolver::new(&gs);
1442
1443        let entities = vec![
1444            ExtractedEntity {
1445                name: "rust".into(),
1446                entity_type: "language".into(),
1447                summary: Some("systems language".into()),
1448            },
1449            ExtractedEntity {
1450                name: "python".into(),
1451                entity_type: "language".into(),
1452                summary: None,
1453            },
1454            ExtractedEntity {
1455                name: "cargo".into(),
1456                entity_type: "tool".into(),
1457                summary: Some("rust build tool".into()),
1458            },
1459        ];
1460
1461        let results = resolver.resolve_batch(&entities).await.unwrap();
1462        assert_eq!(results.len(), 3);
1463        for (id, outcome) in &results {
1464            assert!(*id > 0);
1465            assert_eq!(*outcome, ResolutionOutcome::Created);
1466        }
1467    }
1468
1469    #[tokio::test]
1470    async fn resolve_batch_empty_returns_empty() {
1471        let gs = setup().await;
1472        let resolver = EntityResolver::new(&gs);
1473        let results = resolver.resolve_batch(&[]).await.unwrap();
1474        assert!(results.is_empty());
1475    }
1476
1477    #[tokio::test]
1478    async fn merge_combines_summaries() {
1479        let (gs, emb) = setup_with_embedding().await;
1480        // Use a different name for the existing entity so exact match doesn't trigger.
1481        // "mergetest v1" is stored with embedding; we then resolve "mergetest v2" which
1482        // embeds to the same vector → similarity = 1.0 > threshold → merge.
1483        let existing_id = gs
1484            .upsert_entity(
1485                "mergetest v1",
1486                "mergetest v1",
1487                EntityType::Concept,
1488                Some("first summary"),
1489            )
1490            .await
1491            .unwrap();
1492
1493        let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1494        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1495            .await
1496            .unwrap();
1497        let payload = serde_json::json!({
1498            "entity_id": existing_id,
1499            "name": "mergetest v1",
1500            "entity_type": "concept",
1501            "summary": "first summary",
1502        });
1503        let point_id = emb
1504            .store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1505            .await
1506            .unwrap();
1507        gs.set_entity_qdrant_point_id(existing_id, &point_id)
1508            .await
1509            .unwrap();
1510
1511        let provider = make_mock_provider_with_embedding(mock_vec);
1512        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1513
1514        let resolver = EntityResolver::new(&gs)
1515            .with_embedding_store(&emb)
1516            .with_provider(&any_provider)
1517            .with_thresholds(0.85, 0.70);
1518
1519        // Resolve "mergetest v2" — no exact match, but embedding is identical → merge
1520        let (id, outcome) = resolver
1521            .resolve("mergetest v2", "concept", Some("second summary"))
1522            .await
1523            .unwrap();
1524
1525        assert_eq!(id, existing_id);
1526        assert!(matches!(outcome, ResolutionOutcome::EmbeddingMatch { .. }));
1527
1528        // Verify the merged summary was updated on the existing entity
1529        let entity = gs
1530            .find_entity("mergetest v1", EntityType::Concept)
1531            .await
1532            .unwrap()
1533            .unwrap();
1534        let summary = entity.summary.unwrap_or_default();
1535        assert!(
1536            summary.contains("first summary") && summary.contains("second summary"),
1537            "merged summary should contain both: got {summary:?}"
1538        );
1539    }
1540
1541    #[tokio::test]
1542    async fn merge_preserves_older_entity_id() {
1543        let (gs, emb) = setup_with_embedding().await;
1544        // "legacy entity" stored with embedding; "legacy entity variant" has same vector → merge
1545        let existing_id = gs
1546            .upsert_entity(
1547                "legacy entity",
1548                "legacy entity",
1549                EntityType::Concept,
1550                Some("old info"),
1551            )
1552            .await
1553            .unwrap();
1554
1555        let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1556        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1557            .await
1558            .unwrap();
1559        let payload = serde_json::json!({
1560            "entity_id": existing_id,
1561            "name": "legacy entity",
1562            "entity_type": "concept",
1563            "summary": "old info",
1564        });
1565        emb.store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1566            .await
1567            .unwrap();
1568
1569        let provider = make_mock_provider_with_embedding(mock_vec);
1570        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1571
1572        let resolver = EntityResolver::new(&gs)
1573            .with_embedding_store(&emb)
1574            .with_provider(&any_provider)
1575            .with_thresholds(0.85, 0.70);
1576
1577        let (returned_id, _) = resolver
1578            .resolve("legacy entity variant", "concept", Some("new info"))
1579            .await
1580            .unwrap();
1581
1582        assert_eq!(
1583            returned_id, existing_id,
1584            "older entity ID should be preserved on merge"
1585        );
1586    }
1587
1588    #[tokio::test]
1589    async fn entity_type_filter_prevents_cross_type_merge() {
1590        let (gs, emb) = setup_with_embedding().await;
1591
1592        // Insert a Person named "python"
1593        let person_id = gs
1594            .upsert_entity(
1595                "python",
1596                "python",
1597                EntityType::Person,
1598                Some("a person named python"),
1599            )
1600            .await
1601            .unwrap();
1602
1603        let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1604        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1605            .await
1606            .unwrap();
1607        let payload = serde_json::json!({
1608            "entity_id": person_id,
1609            "name": "python",
1610            "entity_type": "person",
1611            "summary": "a person named python",
1612        });
1613        emb.store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1614            .await
1615            .unwrap();
1616
1617        let provider = make_mock_provider_with_embedding(mock_vec);
1618        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1619
1620        let resolver = EntityResolver::new(&gs)
1621            .with_embedding_store(&emb)
1622            .with_provider(&any_provider)
1623            .with_thresholds(0.85, 0.70);
1624
1625        // Resolve "python" as Language — should NOT merge with the Person entity
1626        let (lang_id, outcome) = resolver
1627            .resolve("python", "language", Some("python language"))
1628            .await
1629            .unwrap();
1630
1631        // The entity_type filter should prevent merging person "python" with language "python"
1632        // Check: either created new or an exact match was found under language type
1633        assert_ne!(
1634            lang_id, person_id,
1635            "language entity should not merge with person entity"
1636        );
1637        // The entity_type filter causes no embedding candidate to survive the type filter,
1638        // so resolution falls back to creating a new entity.
1639        assert_eq!(outcome, ResolutionOutcome::Created);
1640    }
1641
1642    #[tokio::test]
1643    async fn custom_thresholds_respected() {
1644        let (gs, emb) = setup_with_embedding().await;
1645        // With a very high threshold (1.0), even identical vectors won't merge
1646        // (they'd score exactly 1.0 which is NOT > 1.0, so... let's use 0.5 threshold
1647        // and verify score below 0.5 creates new)
1648        let existing_id = gs
1649            .upsert_entity(
1650                "threshold_test",
1651                "threshold_test",
1652                EntityType::Concept,
1653                Some("base"),
1654            )
1655            .await
1656            .unwrap();
1657
1658        let existing_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1659        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1660            .await
1661            .unwrap();
1662        let payload = serde_json::json!({
1663            "entity_id": existing_id,
1664            "name": "threshold_test",
1665            "entity_type": "concept",
1666            "summary": "base",
1667        });
1668        emb.store_to_collection(ENTITY_COLLECTION, payload, existing_vec)
1669            .await
1670            .unwrap();
1671
1672        // Orthogonal vector → score = 0.0
1673        let provider = make_mock_provider_with_embedding(vec![0.0, 1.0, 0.0, 0.0]);
1674        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1675
1676        // With thresholds 0.50/0.30, score=0 is below 0.30 → create new
1677        let resolver = EntityResolver::new(&gs)
1678            .with_embedding_store(&emb)
1679            .with_provider(&any_provider)
1680            .with_thresholds(0.50, 0.30);
1681
1682        let (id, outcome) = resolver
1683            .resolve("new_concept", "concept", Some("different"))
1684            .await
1685            .unwrap();
1686
1687        assert_ne!(id, existing_id);
1688        assert_eq!(outcome, ResolutionOutcome::Created);
1689    }
1690
1691    #[tokio::test]
1692    async fn resolve_outcome_exact_match_no_embedding_store() {
1693        let gs = setup().await;
1694        let resolver = EntityResolver::new(&gs);
1695
1696        resolver.resolve("existing", "concept", None).await.unwrap();
1697        let (_, outcome) = resolver.resolve("existing", "concept", None).await.unwrap();
1698        assert_eq!(outcome, ResolutionOutcome::ExactMatch);
1699    }
1700
1701    #[tokio::test]
1702    async fn extract_json_strips_markdown_fences() {
1703        let with_fence = "```json\n{\"same_entity\": true}\n```";
1704        let extracted = extract_json(with_fence);
1705        let parsed: DisambiguationResponse = serde_json::from_str(extracted).unwrap();
1706        assert!(parsed.same_entity);
1707
1708        let without_fence = "{\"same_entity\": false}";
1709        let extracted2 = extract_json(without_fence);
1710        let parsed2: DisambiguationResponse = serde_json::from_str(extracted2).unwrap();
1711        assert!(!parsed2.same_entity);
1712    }
1713
1714    // Helper: build a MockProvider with embeddings enabled, given vector, and queued chat responses.
1715    fn make_mock_with_embedding_and_chat(
1716        embedding: Vec<f32>,
1717        chat_responses: Vec<String>,
1718    ) -> zeph_llm::mock::MockProvider {
1719        let mut p = zeph_llm::mock::MockProvider::with_responses(chat_responses);
1720        p.embedding = embedding;
1721        p.supports_embeddings = true;
1722        p
1723    }
1724
1725    // Seed an existing entity into both SQLite and InMemoryVectorStore at a known vector.
1726    async fn seed_entity_with_vector(
1727        gs: &GraphStore,
1728        emb: &Arc<EmbeddingStore>,
1729        name: &str,
1730        entity_type: EntityType,
1731        summary: &str,
1732        vector: Vec<f32>,
1733    ) -> i64 {
1734        let id = gs
1735            .upsert_entity(name, name, entity_type, Some(summary))
1736            .await
1737            .unwrap();
1738        emb.ensure_named_collection(ENTITY_COLLECTION, u64::try_from(vector.len()).unwrap())
1739            .await
1740            .unwrap();
1741        let payload = serde_json::json!({
1742            "entity_id": id,
1743            "name": name,
1744            "entity_type": entity_type.as_str(),
1745            "summary": summary,
1746        });
1747        let point_id = emb
1748            .store_to_collection(ENTITY_COLLECTION, payload, vector)
1749            .await
1750            .unwrap();
1751        gs.set_entity_qdrant_point_id(id, &point_id).await.unwrap();
1752        id
1753    }
1754
1755    // ── GAP-1: ambiguous score + LLM says same_entity=true → LlmDisambiguated ─
1756
1757    #[tokio::test]
1758    async fn resolve_ambiguous_score_llm_says_merge() {
1759        // existing entity at [1,0,0,0]; new entity embeds to [1,1,0,0] → cosine ≈ 0.707
1760        // thresholds: similarity=0.85, ambiguous=0.50 → score 0.707 is in [0.50, 0.85)
1761        let (gs, emb) = setup_with_embedding().await;
1762        let existing_id = seed_entity_with_vector(
1763            &gs,
1764            &emb,
1765            "goroutine",
1766            EntityType::Concept,
1767            "go concurrency primitive",
1768            vec![1.0, 0.0, 0.0, 0.0],
1769        )
1770        .await;
1771
1772        // LLM responds with same_entity=true → should merge
1773        let provider = make_mock_with_embedding_and_chat(
1774            vec![1.0, 1.0, 0.0, 0.0],
1775            vec![r#"{"same_entity": true}"#.to_owned()],
1776        );
1777        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1778
1779        let resolver = EntityResolver::new(&gs)
1780            .with_embedding_store(&emb)
1781            .with_provider(&any_provider)
1782            .with_thresholds(0.85, 0.50);
1783
1784        let (id, outcome) = resolver
1785            .resolve("goroutine concurrency", "concept", Some("go concurrency"))
1786            .await
1787            .unwrap();
1788
1789        assert_eq!(
1790            id, existing_id,
1791            "should return existing entity ID on LLM merge"
1792        );
1793        assert_eq!(outcome, ResolutionOutcome::LlmDisambiguated);
1794    }
1795
1796    // ── GAP-2: ambiguous score + LLM says same_entity=false → Created ──────────
1797
1798    #[tokio::test]
1799    async fn resolve_ambiguous_score_llm_says_different() {
1800        let (gs, emb) = setup_with_embedding().await;
1801        let existing_id = seed_entity_with_vector(
1802            &gs,
1803            &emb,
1804            "channel",
1805            EntityType::Concept,
1806            "go channel",
1807            vec![1.0, 0.0, 0.0, 0.0],
1808        )
1809        .await;
1810
1811        // LLM responds with same_entity=false → should create new entity
1812        let provider = make_mock_with_embedding_and_chat(
1813            vec![1.0, 1.0, 0.0, 0.0],
1814            vec![r#"{"same_entity": false}"#.to_owned()],
1815        );
1816        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1817
1818        let resolver = EntityResolver::new(&gs)
1819            .with_embedding_store(&emb)
1820            .with_provider(&any_provider)
1821            .with_thresholds(0.85, 0.50);
1822
1823        let (id, outcome) = resolver
1824            .resolve("network channel", "concept", Some("networking channel"))
1825            .await
1826            .unwrap();
1827
1828        assert_ne!(
1829            id, existing_id,
1830            "LLM-rejected match should create new entity"
1831        );
1832        assert_eq!(outcome, ResolutionOutcome::Created);
1833    }
1834
1835    // ── GAP-3: ambiguous score + LLM chat fails → fallback counter incremented ─
1836
1837    #[tokio::test]
1838    async fn resolve_ambiguous_score_llm_failure_increments_fallback() {
1839        let (gs, emb) = setup_with_embedding().await;
1840        let existing_id = seed_entity_with_vector(
1841            &gs,
1842            &emb,
1843            "mutex",
1844            EntityType::Concept,
1845            "mutual exclusion lock",
1846            vec![1.0, 0.0, 0.0, 0.0],
1847        )
1848        .await;
1849
1850        // fail_chat=true → provider.chat() returns Err → None from llm_disambiguate
1851        let mut provider = make_mock_with_embedding_and_chat(vec![1.0, 1.0, 0.0, 0.0], vec![]);
1852        provider.fail_chat = true;
1853        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1854
1855        let resolver = EntityResolver::new(&gs)
1856            .with_embedding_store(&emb)
1857            .with_provider(&any_provider)
1858            .with_thresholds(0.85, 0.50);
1859
1860        let fallback_count = resolver.fallback_count();
1861
1862        let (id, outcome) = resolver
1863            .resolve("mutex lock", "concept", Some("synchronization primitive"))
1864            .await
1865            .unwrap();
1866
1867        // LLM failure → fallback to create new
1868        assert_ne!(
1869            id, existing_id,
1870            "LLM failure should create new entity (fallback)"
1871        );
1872        assert_eq!(outcome, ResolutionOutcome::Created);
1873        assert_eq!(
1874            fallback_count.load(std::sync::atomic::Ordering::Relaxed),
1875            1,
1876            "fallback counter should be incremented on LLM chat failure"
1877        );
1878    }
1879
1880    // ── Canonicalization / alias tests ────────────────────────────────────────
1881
1882    #[tokio::test]
1883    async fn resolve_creates_entity_with_canonical_name() {
1884        let gs = setup().await;
1885        let resolver = EntityResolver::new(&gs);
1886        let (id, _) = resolver.resolve("Rust", "language", None).await.unwrap();
1887        assert!(id > 0);
1888        let entity = gs
1889            .find_entity("rust", EntityType::Language)
1890            .await
1891            .unwrap()
1892            .unwrap();
1893        assert_eq!(entity.canonical_name, "rust");
1894    }
1895
1896    #[tokio::test]
1897    async fn resolve_adds_alias_on_create() {
1898        let gs = setup().await;
1899        let resolver = EntityResolver::new(&gs);
1900        let (id, _) = resolver.resolve("Rust", "language", None).await.unwrap();
1901        let aliases = gs.aliases_for_entity(id).await.unwrap();
1902        assert!(
1903            !aliases.is_empty(),
1904            "new entity should have at least one alias"
1905        );
1906        assert!(aliases.iter().any(|a| a.alias_name == "rust"));
1907    }
1908
1909    #[tokio::test]
1910    async fn resolve_reuses_entity_by_alias() {
1911        let gs = setup().await;
1912        let resolver = EntityResolver::new(&gs);
1913
1914        // Create entity and register an alias
1915        let (id1, _) = resolver.resolve("rust", "language", None).await.unwrap();
1916        gs.add_alias(id1, "rust-lang").await.unwrap();
1917
1918        // Resolve using the alias — should return the same entity
1919        let (id2, _) = resolver
1920            .resolve("rust-lang", "language", None)
1921            .await
1922            .unwrap();
1923        assert_eq!(
1924            id1, id2,
1925            "'rust-lang' alias should resolve to same entity as 'rust'"
1926        );
1927    }
1928
1929    #[tokio::test]
1930    async fn resolve_alias_match_respects_entity_type() {
1931        let gs = setup().await;
1932        let resolver = EntityResolver::new(&gs);
1933
1934        // "python" as a Language
1935        let (lang_id, _) = resolver.resolve("python", "language", None).await.unwrap();
1936
1937        // "python" as a Tool should create a separate entity (different type)
1938        let (tool_id, _) = resolver.resolve("python", "tool", None).await.unwrap();
1939        assert_ne!(
1940            lang_id, tool_id,
1941            "same name with different type should be separate entities"
1942        );
1943    }
1944
1945    #[tokio::test]
1946    async fn resolve_preserves_existing_aliases() {
1947        let gs = setup().await;
1948        let resolver = EntityResolver::new(&gs);
1949
1950        let (id, _) = resolver.resolve("rust", "language", None).await.unwrap();
1951        gs.add_alias(id, "rust-lang").await.unwrap();
1952
1953        // Upserting same entity should not remove prior aliases
1954        resolver
1955            .resolve("rust", "language", Some("updated"))
1956            .await
1957            .unwrap();
1958        let aliases = gs.aliases_for_entity(id).await.unwrap();
1959        assert!(
1960            aliases.iter().any(|a| a.alias_name == "rust-lang"),
1961            "prior alias must be preserved"
1962        );
1963    }
1964
1965    #[tokio::test]
1966    async fn resolve_original_form_registered_as_alias() {
1967        let gs = setup().await;
1968        let resolver = EntityResolver::new(&gs);
1969
1970        // "  Rust  " — original trimmed lowercased form is "rust", same as normalized
1971        // So only one alias should be registered (no duplicate)
1972        let (id, _) = resolver
1973            .resolve("  Rust  ", "language", None)
1974            .await
1975            .unwrap();
1976        let aliases = gs.aliases_for_entity(id).await.unwrap();
1977        assert!(aliases.iter().any(|a| a.alias_name == "rust"));
1978    }
1979
1980    #[tokio::test]
1981    async fn resolve_entity_with_many_aliases() {
1982        let gs = setup().await;
1983        let id = gs
1984            .upsert_entity("bigentity", "bigentity", EntityType::Concept, None)
1985            .await
1986            .unwrap();
1987        for i in 0..100 {
1988            gs.add_alias(id, &format!("alias-{i}")).await.unwrap();
1989        }
1990        let aliases = gs.aliases_for_entity(id).await.unwrap();
1991        assert_eq!(aliases.len(), 100);
1992
1993        // Fuzzy search should still work via alias
1994        let results = gs.find_entities_fuzzy("alias-50", 10).await.unwrap();
1995        assert!(results.iter().any(|e| e.id == id));
1996    }
1997}