Skip to main content

zeph_memory/graph/
resolver.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::sync::Arc;
5
6use dashmap::DashMap;
7use futures::stream::{self, StreamExt as _};
8use schemars::JsonSchema;
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use zeph_llm::any::AnyProvider;
12use zeph_llm::provider::{LlmProvider as _, Message, Role};
13
14use super::store::GraphStore;
15use super::types::EntityType;
16use crate::embedding_store::EmbeddingStore;
17use crate::error::MemoryError;
18use crate::graph::extractor::ExtractedEntity;
19use crate::types::MessageId;
20use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
21
22/// Maximum byte length for entity names stored in the graph.
23const MAX_ENTITY_NAME_BYTES: usize = 512;
24/// Maximum byte length for relation strings.
25const MAX_RELATION_BYTES: usize = 256;
26/// Maximum byte length for fact strings.
27const MAX_FACT_BYTES: usize = 2048;
28
29/// Qdrant collection for entity embeddings.
30const ENTITY_COLLECTION: &str = "zeph_graph_entities";
31
32/// Timeout for a single `embed()` call in seconds.
33const EMBED_TIMEOUT_SECS: u64 = 30;
34
35/// Strip ASCII control characters and Unicode `BiDi` override codepoints.
36fn strip_control_chars(s: &str) -> String {
37    s.chars()
38        .filter(|c| !c.is_control() && !matches!(*c as u32, 0x202A..=0x202E | 0x2066..=0x2069))
39        .collect()
40}
41
42/// Truncate a string to at most `max_bytes` bytes at a valid UTF-8 char boundary.
43fn truncate_to_bytes(s: &str, max_bytes: usize) -> &str {
44    if s.len() <= max_bytes {
45        return s;
46    }
47    let mut boundary = max_bytes;
48    while !s.is_char_boundary(boundary) {
49        boundary -= 1;
50    }
51    &s[..boundary]
52}
53
54/// Outcome of an entity resolution attempt.
55#[derive(Debug, Clone, PartialEq)]
56pub enum ResolutionOutcome {
57    /// Exact name+type match in `SQLite`.
58    ExactMatch,
59    /// Cosine similarity >= merge threshold; score is the cosine similarity value.
60    EmbeddingMatch { score: f32 },
61    /// LLM confirmed merge in ambiguous similarity range.
62    LlmDisambiguated,
63    /// New entity was created.
64    Created,
65}
66
67/// LLM response for entity disambiguation.
68#[derive(Debug, Deserialize, JsonSchema)]
69struct DisambiguationResponse {
70    same_entity: bool,
71}
72
73/// Per-entity-name lock guard to prevent concurrent duplicate creation.
74///
75/// Keyed by normalized entity name. Entities with different names resolve concurrently;
76/// entities with the same name are serialized.
77///
78/// TODO(SEC-M33-02): This map grows unboundedly — one entry per unique normalized name.
79/// For a short-lived resolver this is acceptable. If the resolver becomes long-lived
80/// (stored in `SemanticMemory`), add eviction or use a fixed-size sharded lock array.
81type NameLockMap = Arc<DashMap<String, Arc<Mutex<()>>>>;
82
83pub struct EntityResolver<'a> {
84    store: &'a GraphStore,
85    embedding_store: Option<&'a Arc<EmbeddingStore>>,
86    provider: Option<&'a AnyProvider>,
87    similarity_threshold: f32,
88    ambiguous_threshold: f32,
89    name_locks: NameLockMap,
90    /// Counter for error-triggered fallbacks (embed/LLM failures). Tests can read this via Arc.
91    fallback_count: Arc<std::sync::atomic::AtomicU64>,
92}
93
94impl<'a> EntityResolver<'a> {
95    #[must_use]
96    pub fn new(store: &'a GraphStore) -> Self {
97        Self {
98            store,
99            embedding_store: None,
100            provider: None,
101            similarity_threshold: 0.85,
102            ambiguous_threshold: 0.70,
103            name_locks: Arc::new(DashMap::new()),
104            fallback_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
105        }
106    }
107
108    #[must_use]
109    pub fn with_embedding_store(mut self, store: &'a Arc<EmbeddingStore>) -> Self {
110        self.embedding_store = Some(store);
111        self
112    }
113
114    #[must_use]
115    pub fn with_provider(mut self, provider: &'a AnyProvider) -> Self {
116        self.provider = Some(provider);
117        self
118    }
119
120    #[must_use]
121    pub fn with_thresholds(mut self, similarity: f32, ambiguous: f32) -> Self {
122        self.similarity_threshold = similarity;
123        self.ambiguous_threshold = ambiguous;
124        self
125    }
126
127    /// Shared fallback counter — tests can clone this Arc to inspect the value.
128    #[must_use]
129    pub fn fallback_count(&self) -> Arc<std::sync::atomic::AtomicU64> {
130        Arc::clone(&self.fallback_count)
131    }
132
133    /// Normalize an entity name: trim, lowercase, strip control chars, truncate.
134    fn normalize_name(name: &str) -> String {
135        let lowered = name.trim().to_lowercase();
136        let cleaned = strip_control_chars(&lowered);
137        let normalized = truncate_to_bytes(&cleaned, MAX_ENTITY_NAME_BYTES).to_owned();
138        if normalized.len() < cleaned.len() {
139            tracing::debug!(
140                "graph resolver: entity name truncated to {} bytes",
141                MAX_ENTITY_NAME_BYTES
142            );
143        }
144        normalized
145    }
146
147    /// Parse an entity type string, falling back to `Concept` on unknown values.
148    fn parse_entity_type(entity_type: &str) -> EntityType {
149        entity_type
150            .trim()
151            .to_lowercase()
152            .parse::<EntityType>()
153            .unwrap_or_else(|_| {
154                tracing::debug!(
155                    "graph resolver: unknown entity type {:?}, falling back to Concept",
156                    entity_type
157                );
158                EntityType::Concept
159            })
160    }
161
162    /// Acquire the per-name lock and return the guard. Keeps lock alive for the caller.
163    async fn lock_name(&self, normalized: &str) -> tokio::sync::OwnedMutexGuard<()> {
164        let lock = self
165            .name_locks
166            .entry(normalized.to_owned())
167            .or_insert_with(|| Arc::new(Mutex::new(())))
168            .clone();
169        lock.lock_owned().await
170    }
171
172    /// Resolve an extracted entity using the alias-first canonicalization pipeline.
173    ///
174    /// Pipeline:
175    /// 1. Normalize: trim, lowercase, strip control chars, truncate to 512 bytes.
176    /// 2. Parse entity type (fallback to Concept on unknown).
177    /// 3. Alias lookup: search `graph_entity_aliases` by normalized name + `entity_type`.
178    ///    If found, touch `last_seen_at` and return the existing entity id.
179    /// 4. Canonical name lookup: search `graph_entities` by `canonical_name` + `entity_type`.
180    ///    If found, touch `last_seen_at` and return the existing entity id.
181    /// 5. When `embedding_store` and `provider` are configured, performs embedding-based fuzzy
182    ///    matching: cosine similarity search (Qdrant), LLM disambiguation for ambiguous range,
183    ///    merge or create based on result. Failures degrade gracefully to step 6.
184    /// 6. Create: upsert new entity with `canonical_name` = normalized name.
185    /// 7. Register the normalized form (and original trimmed form if different) as aliases.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if the entity name is empty after normalization, or if a DB operation fails.
190    #[allow(clippy::too_many_lines)]
191    pub async fn resolve(
192        &self,
193        name: &str,
194        entity_type: &str,
195        summary: Option<&str>,
196    ) -> Result<(i64, ResolutionOutcome), MemoryError> {
197        let normalized = Self::normalize_name(name);
198
199        if normalized.is_empty() {
200            return Err(MemoryError::GraphStore("empty entity name".into()));
201        }
202
203        let et = Self::parse_entity_type(entity_type);
204
205        // The surface form preserves the original casing for user-facing display.
206        let surface_name = name.trim().to_owned();
207
208        // Acquire per-name lock to prevent concurrent duplicate creation.
209        let _guard = self.lock_name(&normalized).await;
210
211        // Step 3: alias-first lookup (filters by entity_type to prevent cross-type collisions).
212        if let Some(entity) = self.store.find_entity_by_alias(&normalized, et).await? {
213            self.store
214                .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
215                .await?;
216            return Ok((entity.id, ResolutionOutcome::ExactMatch));
217        }
218
219        // Step 4: canonical name lookup.
220        if let Some(entity) = self.store.find_entity(&normalized, et).await? {
221            self.store
222                .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
223                .await?;
224            return Ok((entity.id, ResolutionOutcome::ExactMatch));
225        }
226
227        // Step 5: Embedding-based resolution (when configured).
228        if let (Some(emb_store), Some(provider)) = (self.embedding_store, self.provider) {
229            let safe_summary = truncate_to_bytes(summary.unwrap_or(""), MAX_FACT_BYTES);
230            let embed_text = format!("{normalized}: {safe_summary}");
231
232            let embed_result = tokio::time::timeout(
233                std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
234                provider.embed(&embed_text),
235            )
236            .await;
237
238            match embed_result {
239                Ok(Ok(query_vec)) => {
240                    let type_filter = VectorFilter {
241                        must: vec![FieldCondition {
242                            field: "entity_type".into(),
243                            value: FieldValue::Text(et.as_str().to_owned()),
244                        }],
245                        must_not: vec![],
246                    };
247                    match emb_store
248                        .search_collection(ENTITY_COLLECTION, &query_vec, 5, Some(type_filter))
249                        .await
250                    {
251                        Ok(candidates) if !candidates.is_empty() => {
252                            let best = &candidates[0];
253                            let score = best.score;
254
255                            if score >= self.similarity_threshold {
256                                let entity_id = best
257                                    .payload
258                                    .get("entity_id")
259                                    .and_then(serde_json::Value::as_i64)
260                                    .ok_or_else(|| {
261                                        MemoryError::GraphStore(
262                                            "missing entity_id in payload".into(),
263                                        )
264                                    })?;
265
266                                self.merge_entity(
267                                    emb_store,
268                                    provider,
269                                    entity_id,
270                                    &surface_name,
271                                    &normalized,
272                                    et,
273                                    summary,
274                                )
275                                .await?;
276
277                                return Ok((
278                                    entity_id,
279                                    ResolutionOutcome::EmbeddingMatch { score },
280                                ));
281                            } else if score >= self.ambiguous_threshold {
282                                let entity_id = best
283                                    .payload
284                                    .get("entity_id")
285                                    .and_then(serde_json::Value::as_i64)
286                                    .ok_or_else(|| {
287                                        MemoryError::GraphStore(
288                                            "missing entity_id in payload".into(),
289                                        )
290                                    })?;
291                                let existing_name = best
292                                    .payload
293                                    .get("name")
294                                    .and_then(|v| v.as_str())
295                                    .unwrap_or("")
296                                    .to_owned();
297                                let existing_summary = best
298                                    .payload
299                                    .get("summary")
300                                    .and_then(|v| v.as_str())
301                                    .unwrap_or("")
302                                    .to_owned();
303                                // Use the existing entity's actual type from the payload (IC-S3)
304                                let existing_type = best
305                                    .payload
306                                    .get("entity_type")
307                                    .and_then(|v| v.as_str())
308                                    .unwrap_or(et.as_str())
309                                    .to_owned();
310
311                                match self
312                                    .llm_disambiguate(
313                                        provider,
314                                        &normalized,
315                                        et.as_str(),
316                                        summary.unwrap_or(""),
317                                        &existing_name,
318                                        &existing_type,
319                                        &existing_summary,
320                                        score,
321                                    )
322                                    .await
323                                {
324                                    Some(true) => {
325                                        self.merge_entity(
326                                            emb_store,
327                                            provider,
328                                            entity_id,
329                                            &surface_name,
330                                            &normalized,
331                                            et,
332                                            summary,
333                                        )
334                                        .await?;
335                                        return Ok((
336                                            entity_id,
337                                            ResolutionOutcome::LlmDisambiguated,
338                                        ));
339                                    }
340                                    Some(false) => {
341                                        // LLM says different entity — fall through to create
342                                    }
343                                    None => {
344                                        // LLM failed — increment fallback counter, create new
345                                        self.fallback_count
346                                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
347                                        tracing::warn!(
348                                            entity_name = %normalized,
349                                            "LLM disambiguation failed; falling back to create new entity"
350                                        );
351                                    }
352                                }
353                            }
354                            // score < ambiguous_threshold or LLM said different: fall through to create
355                        }
356                        Ok(_) => {
357                            // No candidates — fall through to create
358                        }
359                        Err(err) => {
360                            self.fallback_count
361                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
362                            tracing::warn!(
363                                entity_name = %normalized,
364                                error = %err,
365                                "Qdrant search failed; falling back to create new entity"
366                            );
367                        }
368                    }
369
370                    // No embedding match — create new entity and store embedding
371                    let entity_id = self
372                        .store
373                        .upsert_entity(&surface_name, &normalized, et, summary)
374                        .await?;
375
376                    self.register_aliases(entity_id, &normalized, name).await?;
377
378                    self.store_entity_embedding(
379                        emb_store,
380                        entity_id,
381                        None,
382                        &normalized,
383                        et,
384                        summary.unwrap_or(""),
385                        &query_vec,
386                    )
387                    .await;
388
389                    return Ok((entity_id, ResolutionOutcome::Created));
390                }
391                Ok(Err(err)) => {
392                    self.fallback_count
393                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
394                    tracing::warn!(
395                        entity_name = %normalized,
396                        error = %err,
397                        "embed() failed; falling back to exact-match-only entity creation"
398                    );
399                }
400                Err(_timeout) => {
401                    self.fallback_count
402                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
403                    tracing::warn!(
404                        entity_name = %normalized,
405                        "embed() timed out after {}s; falling back to create new entity",
406                        EMBED_TIMEOUT_SECS
407                    );
408                }
409            }
410        }
411
412        // Step 6: Create new entity (no embedding store, or embedding failure).
413        let entity_id = self
414            .store
415            .upsert_entity(&surface_name, &normalized, et, summary)
416            .await?;
417
418        self.register_aliases(entity_id, &normalized, name).await?;
419
420        Ok((entity_id, ResolutionOutcome::Created))
421    }
422
423    /// Register the normalized form and original trimmed form as aliases for an entity.
424    async fn register_aliases(
425        &self,
426        entity_id: i64,
427        normalized: &str,
428        original_name: &str,
429    ) -> Result<(), MemoryError> {
430        self.store.add_alias(entity_id, normalized).await?;
431
432        // Also register the original trimmed lowercased form if it differs from normalized
433        // (e.g. when control chars were stripped, leaving a shorter string).
434        let original_trimmed = original_name.trim().to_lowercase();
435        let original_clean_str = strip_control_chars(&original_trimmed);
436        let original_clean = truncate_to_bytes(&original_clean_str, MAX_ENTITY_NAME_BYTES);
437        if original_clean != normalized {
438            self.store.add_alias(entity_id, original_clean).await?;
439        }
440
441        Ok(())
442    }
443
444    /// Merge an existing entity with new information: combine summaries, update Qdrant.
445    #[allow(clippy::too_many_arguments)]
446    async fn merge_entity(
447        &self,
448        emb_store: &EmbeddingStore,
449        provider: &AnyProvider,
450        entity_id: i64,
451        new_surface_name: &str,
452        new_canonical_name: &str,
453        entity_type: EntityType,
454        new_summary: Option<&str>,
455    ) -> Result<(), MemoryError> {
456        // TODO(PERF-03): The Qdrant payload already contains name/summary at the call site;
457        // pass them in as parameters to eliminate this extra SQLite roundtrip per merge.
458        let existing = self.store.find_entity_by_id(entity_id).await?;
459        let existing_summary = existing
460            .as_ref()
461            .and_then(|e| e.summary.as_deref())
462            .unwrap_or("");
463
464        let merged_summary = if let Some(new) = new_summary {
465            if !new.is_empty() && !existing_summary.is_empty() {
466                let combined = format!("{existing_summary}; {new}");
467                // TODO(S2): use LLM-based summary merge when summary exceeds 512 bytes
468                truncate_to_bytes(&combined, MAX_FACT_BYTES).to_owned()
469            } else if !new.is_empty() {
470                new.to_owned()
471            } else {
472                existing_summary.to_owned()
473            }
474        } else {
475            existing_summary.to_owned()
476        };
477
478        let summary_opt = if merged_summary.is_empty() {
479            None
480        } else {
481            Some(merged_summary.as_str())
482        };
483
484        // Update the EXISTING entity's summary (keep its canonical_name, update surface display name).
485        let existing_canonical = existing.as_ref().map_or_else(
486            || new_canonical_name.to_owned(),
487            |e| e.canonical_name.clone(),
488        );
489        let existing_name_owned = existing
490            .as_ref()
491            .map_or_else(|| new_surface_name.to_owned(), |e| e.name.clone());
492        self.store
493            .upsert_entity(
494                &existing_name_owned,
495                &existing_canonical,
496                entity_type,
497                summary_opt,
498            )
499            .await?;
500
501        // Retrieve existing qdrant_point_id to reuse it (avoids orphaned stale points, IC-S1)
502        let existing_point_id = existing
503            .as_ref()
504            .and_then(|e| e.qdrant_point_id.as_deref())
505            .map(ToOwned::to_owned);
506
507        // Re-embed merged text and upsert to Qdrant
508        let embed_text = format!("{existing_name_owned}: {merged_summary}");
509        let embed_result = tokio::time::timeout(
510            std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
511            provider.embed(&embed_text),
512        )
513        .await;
514
515        match embed_result {
516            Ok(Ok(vec)) => {
517                self.store_entity_embedding(
518                    emb_store,
519                    entity_id,
520                    existing_point_id.as_deref(),
521                    &existing_name_owned,
522                    entity_type,
523                    &merged_summary,
524                    &vec,
525                )
526                .await;
527            }
528            Ok(Err(err)) => {
529                tracing::warn!(
530                    entity_id,
531                    error = %err,
532                    "merge re-embed failed; Qdrant entry may be stale"
533                );
534            }
535            Err(_) => {
536                tracing::warn!(
537                    entity_id,
538                    "merge re-embed timed out; Qdrant entry may be stale"
539                );
540            }
541        }
542
543        Ok(())
544    }
545
546    /// Store an entity embedding in Qdrant and update `qdrant_point_id` in `SQLite`.
547    ///
548    /// When `existing_point_id` is `Some`, the existing Qdrant point is updated in-place
549    /// (upsert by ID) to avoid orphaned stale points. When `None`, a new point is created.
550    ///
551    /// Failures are logged at warn level but do not propagate — the entity is still
552    /// valid in `SQLite` even if Qdrant upsert fails.
553    #[allow(clippy::too_many_arguments)]
554    async fn store_entity_embedding(
555        &self,
556        emb_store: &EmbeddingStore,
557        entity_id: i64,
558        existing_point_id: Option<&str>,
559        name: &str,
560        entity_type: EntityType,
561        summary: &str,
562        vector: &[f32],
563    ) {
564        // TODO(PERF-05): ensure_named_collection() is called on every store_entity_embedding()
565        // invocation, generating one Qdrant network roundtrip per entity in a batch. Cache this
566        // result at resolver construction time via `std::sync::OnceLock<bool>` to call it once.
567        let vector_size = u64::try_from(vector.len()).unwrap_or(384);
568        if let Err(err) = emb_store
569            .ensure_named_collection(ENTITY_COLLECTION, vector_size)
570            .await
571        {
572            tracing::error!(
573                error = %err,
574                "failed to ensure entity embedding collection; skipping Qdrant upsert"
575            );
576            return;
577        }
578
579        let payload = serde_json::json!({
580            "entity_id": entity_id,
581            "name": name,
582            "entity_type": entity_type.as_str(),
583            "summary": summary,
584        });
585
586        if let Some(point_id) = existing_point_id {
587            // Reuse existing point to avoid orphaned stale points (IC-S1)
588            if let Err(err) = emb_store
589                .upsert_to_collection(ENTITY_COLLECTION, point_id, payload, vector.to_vec())
590                .await
591            {
592                tracing::warn!(
593                    entity_id,
594                    error = %err,
595                    "Qdrant upsert (existing point) failed; Qdrant entry may be stale"
596                );
597            }
598        } else {
599            match emb_store
600                .store_to_collection(ENTITY_COLLECTION, payload, vector.to_vec())
601                .await
602            {
603                Ok(point_id) => {
604                    if let Err(err) = self
605                        .store
606                        .set_entity_qdrant_point_id(entity_id, &point_id)
607                        .await
608                    {
609                        tracing::warn!(
610                            entity_id,
611                            error = %err,
612                            "failed to store qdrant_point_id in SQLite"
613                        );
614                    }
615                }
616                Err(err) => {
617                    tracing::warn!(
618                        entity_id,
619                        error = %err,
620                        "Qdrant upsert failed; entity created in SQLite, qdrant_point_id remains NULL"
621                    );
622                }
623            }
624        }
625    }
626
627    /// Ask the LLM whether two entities are the same.
628    ///
629    /// Returns `Some(true)` for merge, `Some(false)` for separate, `None` on failure.
630    #[allow(clippy::too_many_arguments)]
631    async fn llm_disambiguate(
632        &self,
633        provider: &AnyProvider,
634        new_name: &str,
635        new_type: &str,
636        new_summary: &str,
637        existing_name: &str,
638        existing_type: &str,
639        existing_summary: &str,
640        score: f32,
641    ) -> Option<bool> {
642        let prompt = format!(
643            "New entity:\n\
644             - Name: <external-data>{new_name}</external-data>\n\
645             - Type: <external-data>{new_type}</external-data>\n\
646             - Summary: <external-data>{new_summary}</external-data>\n\
647             \n\
648             Existing entity:\n\
649             - Name: <external-data>{existing_name}</external-data>\n\
650             - Type: <external-data>{existing_type}</external-data>\n\
651             - Summary: <external-data>{existing_summary}</external-data>\n\
652             \n\
653             Cosine similarity: {score:.2}\n\
654             \n\
655             Are these the same entity? Respond with JSON: {{\"same_entity\": true}} or {{\"same_entity\": false}}"
656        );
657
658        let messages = [
659            Message::from_legacy(
660                Role::System,
661                "You are an entity disambiguation assistant. Given a new entity mention and \
662                 an existing entity from the knowledge graph, determine if they refer to the same \
663                 real-world entity. Respond only with JSON.",
664            ),
665            Message::from_legacy(Role::User, prompt),
666        ];
667
668        let response = match provider.chat(&messages).await {
669            Ok(r) => r,
670            Err(err) => {
671                tracing::warn!(error = %err, "LLM disambiguation chat failed");
672                return None;
673            }
674        };
675
676        // Parse JSON response, tolerating markdown code fences
677        let json_str = extract_json(&response);
678        match serde_json::from_str::<DisambiguationResponse>(json_str) {
679            Ok(parsed) => Some(parsed.same_entity),
680            Err(err) => {
681                tracing::warn!(error = %err, response = %response, "failed to parse LLM disambiguation response");
682                None
683            }
684        }
685    }
686
687    /// Resolve a batch of extracted entities concurrently.
688    ///
689    /// Returns a `Vec` of `(entity_id, ResolutionOutcome)` in the same order as input.
690    ///
691    /// # Errors
692    ///
693    /// Returns an error if any DB operation fails.
694    ///
695    /// # Panics
696    ///
697    /// Panics if an internal stream collection bug causes a result index to be missing.
698    /// This indicates a programming error and should never occur in correct usage.
699    pub async fn resolve_batch(
700        &self,
701        entities: &[ExtractedEntity],
702    ) -> Result<Vec<(i64, ResolutionOutcome)>, MemoryError> {
703        if entities.is_empty() {
704            return Ok(Vec::new());
705        }
706
707        // Process up to 4 embed+resolve operations concurrently (IC-S2/PERF-01).
708        let mut results: Vec<Option<(i64, ResolutionOutcome)>> = vec![None; entities.len()];
709
710        let mut stream = stream::iter(entities.iter().enumerate().map(|(i, e)| {
711            let name = e.name.clone();
712            let entity_type = e.entity_type.clone();
713            let summary = e.summary.clone();
714            async move {
715                let result = self.resolve(&name, &entity_type, summary.as_deref()).await;
716                (i, result)
717            }
718        }))
719        .buffer_unordered(4);
720
721        while let Some((i, result)) = stream.next().await {
722            match result {
723                Ok(outcome) => results[i] = Some(outcome),
724                Err(err) => return Err(err),
725            }
726        }
727
728        Ok(results
729            .into_iter()
730            .enumerate()
731            .map(|(i, r)| {
732                r.unwrap_or_else(|| {
733                    tracing::warn!(
734                        index = i,
735                        "resolve_batch: missing result at index — bug in stream collection"
736                    );
737                    panic!("resolve_batch: missing result at index {i}")
738                })
739            })
740            .collect())
741    }
742
743    /// Resolve an extracted edge: deduplicate or supersede existing edges.
744    ///
745    /// - If an active edge with the same direction and relation exists with an identical fact,
746    ///   returns `None` (deduplicated).
747    /// - If an active edge with the same direction and relation exists with a different fact,
748    ///   invalidates the old edge and inserts the new one, returning `Some(new_id)`.
749    /// - If no matching edge exists, inserts a new edge and returns `Some(new_id)`.
750    ///
751    /// Relation and fact strings are sanitized (control chars stripped, length-capped).
752    ///
753    /// # Errors
754    ///
755    /// Returns an error if any database operation fails.
756    pub async fn resolve_edge(
757        &self,
758        source_id: i64,
759        target_id: i64,
760        relation: &str,
761        fact: &str,
762        confidence: f32,
763        episode_id: Option<MessageId>,
764    ) -> Result<Option<i64>, MemoryError> {
765        let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
766        let normalized_relation = truncate_to_bytes(&relation_clean, MAX_RELATION_BYTES).to_owned();
767
768        let fact_clean = strip_control_chars(fact.trim());
769        let normalized_fact = truncate_to_bytes(&fact_clean, MAX_FACT_BYTES).to_owned();
770
771        // Fetch only exact-direction edges — no reverse edges to filter out
772        let existing_edges = self.store.edges_exact(source_id, target_id).await?;
773
774        let matching = existing_edges
775            .iter()
776            .find(|e| e.relation == normalized_relation);
777
778        if let Some(old) = matching {
779            if old.fact == normalized_fact {
780                // Exact duplicate — skip
781                return Ok(None);
782            }
783            // Same relation, different fact — supersede
784            self.store.invalidate_edge(old.id).await?;
785        }
786
787        let new_id = self
788            .store
789            .insert_edge(
790                source_id,
791                target_id,
792                &normalized_relation,
793                &normalized_fact,
794                confidence,
795                episode_id,
796            )
797            .await?;
798        Ok(Some(new_id))
799    }
800}
801
802/// Extract a JSON object from a string that may contain markdown code fences.
803fn extract_json(s: &str) -> &str {
804    let trimmed = s.trim();
805    // Strip ```json ... ``` or ``` ... ```
806    if let Some(inner) = trimmed.strip_prefix("```json")
807        && let Some(end) = inner.rfind("```")
808    {
809        return inner[..end].trim();
810    }
811    if let Some(inner) = trimmed.strip_prefix("```")
812        && let Some(end) = inner.rfind("```")
813    {
814        return inner[..end].trim();
815    }
816    // Find first '{' to last '}'
817    if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
818        && start <= end
819    {
820        return &trimmed[start..=end];
821    }
822    trimmed
823}
824
825#[cfg(test)]
826mod tests {
827    use std::sync::Arc;
828
829    use super::*;
830    use crate::in_memory_store::InMemoryVectorStore;
831    use crate::sqlite::SqliteStore;
832
833    async fn setup() -> GraphStore {
834        let store = SqliteStore::new(":memory:").await.unwrap();
835        GraphStore::new(store.pool().clone())
836    }
837
838    async fn setup_with_embedding() -> (GraphStore, Arc<EmbeddingStore>) {
839        let sqlite = SqliteStore::new(":memory:").await.unwrap();
840        let pool = sqlite.pool().clone();
841        let mem_store = Box::new(InMemoryVectorStore::new());
842        let emb = Arc::new(EmbeddingStore::with_store(mem_store, pool));
843        let gs = GraphStore::new(sqlite.pool().clone());
844        (gs, emb)
845    }
846
847    fn make_mock_provider_with_embedding(embedding: Vec<f32>) -> zeph_llm::mock::MockProvider {
848        let mut p = zeph_llm::mock::MockProvider::default();
849        p.embedding = embedding;
850        p.supports_embeddings = true;
851        p
852    }
853
854    // ── Existing tests (resolve() with no embedding store — exact match only) ──
855
856    #[tokio::test]
857    async fn resolve_creates_new_entity() {
858        let gs = setup().await;
859        let resolver = EntityResolver::new(&gs);
860        let (id, outcome) = resolver
861            .resolve("alice", "person", Some("a person"))
862            .await
863            .unwrap();
864        assert!(id > 0);
865        assert_eq!(outcome, ResolutionOutcome::Created);
866    }
867
868    #[tokio::test]
869    async fn resolve_updates_existing_entity() {
870        let gs = setup().await;
871        let resolver = EntityResolver::new(&gs);
872        let (id1, _) = resolver.resolve("alice", "person", None).await.unwrap();
873        let (id2, outcome) = resolver
874            .resolve("alice", "person", Some("updated summary"))
875            .await
876            .unwrap();
877        assert_eq!(id1, id2);
878        assert_eq!(outcome, ResolutionOutcome::ExactMatch);
879
880        let entity = gs
881            .find_entity("alice", EntityType::Person)
882            .await
883            .unwrap()
884            .unwrap();
885        assert_eq!(entity.summary.as_deref(), Some("updated summary"));
886    }
887
888    #[tokio::test]
889    async fn resolve_unknown_type_falls_back_to_concept() {
890        let gs = setup().await;
891        let resolver = EntityResolver::new(&gs);
892        let (id, _) = resolver
893            .resolve("my_thing", "unknown_type", None)
894            .await
895            .unwrap();
896        assert!(id > 0);
897
898        // Verify it was stored as Concept
899        let entity = gs
900            .find_entity("my_thing", EntityType::Concept)
901            .await
902            .unwrap();
903        assert!(entity.is_some());
904    }
905
906    #[tokio::test]
907    async fn resolve_empty_name_returns_error() {
908        let gs = setup().await;
909        let resolver = EntityResolver::new(&gs);
910
911        let result_empty = resolver.resolve("", "concept", None).await;
912        assert!(result_empty.is_err());
913        assert!(matches!(
914            result_empty.unwrap_err(),
915            MemoryError::GraphStore(_)
916        ));
917
918        let result_whitespace = resolver.resolve("   ", "concept", None).await;
919        assert!(result_whitespace.is_err());
920    }
921
922    #[tokio::test]
923    async fn resolve_case_insensitive() {
924        let gs = setup().await;
925        let resolver = EntityResolver::new(&gs);
926
927        let (id1, _) = resolver.resolve("Rust", "language", None).await.unwrap();
928        let (id2, outcome) = resolver.resolve("rust", "language", None).await.unwrap();
929        assert_eq!(
930            id1, id2,
931            "'Rust' and 'rust' should resolve to the same entity"
932        );
933        assert_eq!(outcome, ResolutionOutcome::ExactMatch);
934    }
935
936    #[tokio::test]
937    async fn resolve_edge_inserts_new() {
938        let gs = setup().await;
939        let resolver = EntityResolver::new(&gs);
940
941        let src = gs
942            .upsert_entity("src", "src", EntityType::Concept, None)
943            .await
944            .unwrap();
945        let tgt = gs
946            .upsert_entity("tgt", "tgt", EntityType::Concept, None)
947            .await
948            .unwrap();
949
950        let result = resolver
951            .resolve_edge(src, tgt, "uses", "src uses tgt", 0.9, None)
952            .await
953            .unwrap();
954        assert!(result.is_some());
955        assert!(result.unwrap() > 0);
956    }
957
958    #[tokio::test]
959    async fn resolve_edge_deduplicates_identical() {
960        let gs = setup().await;
961        let resolver = EntityResolver::new(&gs);
962
963        let src = gs
964            .upsert_entity("a", "a", EntityType::Concept, None)
965            .await
966            .unwrap();
967        let tgt = gs
968            .upsert_entity("b", "b", EntityType::Concept, None)
969            .await
970            .unwrap();
971
972        let first = resolver
973            .resolve_edge(src, tgt, "uses", "a uses b", 0.9, None)
974            .await
975            .unwrap();
976        assert!(first.is_some());
977
978        let second = resolver
979            .resolve_edge(src, tgt, "uses", "a uses b", 0.9, None)
980            .await
981            .unwrap();
982        assert!(second.is_none(), "identical edge should be deduplicated");
983    }
984
985    #[tokio::test]
986    async fn resolve_edge_supersedes_contradictory() {
987        let gs = setup().await;
988        let resolver = EntityResolver::new(&gs);
989
990        let src = gs
991            .upsert_entity("x", "x", EntityType::Concept, None)
992            .await
993            .unwrap();
994        let tgt = gs
995            .upsert_entity("y", "y", EntityType::Concept, None)
996            .await
997            .unwrap();
998
999        let first_id = resolver
1000            .resolve_edge(src, tgt, "prefers", "x prefers y (old)", 0.8, None)
1001            .await
1002            .unwrap()
1003            .unwrap();
1004
1005        let second_id = resolver
1006            .resolve_edge(src, tgt, "prefers", "x prefers y (new)", 0.9, None)
1007            .await
1008            .unwrap()
1009            .unwrap();
1010
1011        assert_ne!(first_id, second_id, "superseded edge should have a new ID");
1012
1013        // Old edge should be invalidated
1014        let active_count = gs.active_edge_count().await.unwrap();
1015        assert_eq!(active_count, 1, "only new edge should be active");
1016    }
1017
1018    #[tokio::test]
1019    async fn resolve_edge_direction_sensitive() {
1020        // A->B "uses" should not interfere with B->A "uses" dedup
1021        let gs = setup().await;
1022        let resolver = EntityResolver::new(&gs);
1023
1024        let a = gs
1025            .upsert_entity("node_a", "node_a", EntityType::Concept, None)
1026            .await
1027            .unwrap();
1028        let b = gs
1029            .upsert_entity("node_b", "node_b", EntityType::Concept, None)
1030            .await
1031            .unwrap();
1032
1033        // Insert A->B
1034        let id1 = resolver
1035            .resolve_edge(a, b, "uses", "A uses B", 0.9, None)
1036            .await
1037            .unwrap();
1038        assert!(id1.is_some());
1039
1040        // Insert B->A with different fact — should NOT invalidate A->B (different direction)
1041        let id2 = resolver
1042            .resolve_edge(b, a, "uses", "B uses A (different direction)", 0.9, None)
1043            .await
1044            .unwrap();
1045        assert!(id2.is_some());
1046
1047        // Both edges should still be active
1048        let active_count = gs.active_edge_count().await.unwrap();
1049        assert_eq!(active_count, 2, "both directional edges should be active");
1050    }
1051
1052    #[tokio::test]
1053    async fn resolve_edge_normalizes_relation_case() {
1054        let gs = setup().await;
1055        let resolver = EntityResolver::new(&gs);
1056
1057        let src = gs
1058            .upsert_entity("p", "p", EntityType::Concept, None)
1059            .await
1060            .unwrap();
1061        let tgt = gs
1062            .upsert_entity("q", "q", EntityType::Concept, None)
1063            .await
1064            .unwrap();
1065
1066        // Insert with uppercase relation
1067        let id1 = resolver
1068            .resolve_edge(src, tgt, "Uses", "p uses q", 0.9, None)
1069            .await
1070            .unwrap();
1071        assert!(id1.is_some());
1072
1073        // Insert with lowercase relation — same normalized relation, same fact → deduplicate
1074        let id2 = resolver
1075            .resolve_edge(src, tgt, "uses", "p uses q", 0.9, None)
1076            .await
1077            .unwrap();
1078        assert!(id2.is_none(), "normalized relations should deduplicate");
1079    }
1080
1081    // ── IC-01: entity_type lowercased before parse ────────────────────────────
1082
1083    #[tokio::test]
1084    async fn resolve_entity_type_uppercase_parsed_correctly() {
1085        let gs = setup().await;
1086        let resolver = EntityResolver::new(&gs);
1087
1088        // "Person" (title case from LLM) should parse as EntityType::Person, not fall back to Concept
1089        let (id, _) = resolver
1090            .resolve("test_entity", "Person", None)
1091            .await
1092            .unwrap();
1093        assert!(id > 0);
1094
1095        let entity = gs
1096            .find_entity("test_entity", EntityType::Person)
1097            .await
1098            .unwrap();
1099        assert!(entity.is_some(), "entity should be stored as Person type");
1100    }
1101
1102    #[tokio::test]
1103    async fn resolve_entity_type_all_caps_parsed_correctly() {
1104        let gs = setup().await;
1105        let resolver = EntityResolver::new(&gs);
1106
1107        let (id, _) = resolver.resolve("my_lang", "LANGUAGE", None).await.unwrap();
1108        assert!(id > 0);
1109
1110        let entity = gs
1111            .find_entity("my_lang", EntityType::Language)
1112            .await
1113            .unwrap();
1114        assert!(entity.is_some(), "entity should be stored as Language type");
1115    }
1116
1117    // ── SEC-GRAPH-01: entity name length cap ──────────────────────────────────
1118
1119    #[tokio::test]
1120    async fn resolve_truncates_long_entity_name() {
1121        let gs = setup().await;
1122        let resolver = EntityResolver::new(&gs);
1123
1124        let long_name = "a".repeat(1024);
1125        let (id, _) = resolver.resolve(&long_name, "concept", None).await.unwrap();
1126        assert!(id > 0);
1127
1128        // Entity should exist with a truncated name (512 bytes)
1129        let entity = gs
1130            .find_entity(&"a".repeat(512), EntityType::Concept)
1131            .await
1132            .unwrap();
1133        assert!(entity.is_some(), "truncated name should be stored");
1134    }
1135
1136    // ── SEC-GRAPH-02: control character stripping ─────────────────────────────
1137
1138    #[tokio::test]
1139    async fn resolve_strips_control_chars_from_name() {
1140        let gs = setup().await;
1141        let resolver = EntityResolver::new(&gs);
1142
1143        // Name with null byte and a BiDi override
1144        let name_with_ctrl = "rust\x00lang";
1145        let (id, _) = resolver
1146            .resolve(name_with_ctrl, "language", None)
1147            .await
1148            .unwrap();
1149        assert!(id > 0);
1150
1151        // Stored name should have control chars removed
1152        let entity = gs
1153            .find_entity("rustlang", EntityType::Language)
1154            .await
1155            .unwrap();
1156        assert!(
1157            entity.is_some(),
1158            "control chars should be stripped from stored name"
1159        );
1160    }
1161
1162    #[tokio::test]
1163    async fn resolve_strips_bidi_overrides_from_name() {
1164        let gs = setup().await;
1165        let resolver = EntityResolver::new(&gs);
1166
1167        // U+202E is RIGHT-TO-LEFT OVERRIDE — a BiDi spoof character
1168        let name_with_bidi = "rust\u{202E}lang";
1169        let (id, _) = resolver
1170            .resolve(name_with_bidi, "language", None)
1171            .await
1172            .unwrap();
1173        assert!(id > 0);
1174
1175        let entity = gs
1176            .find_entity("rustlang", EntityType::Language)
1177            .await
1178            .unwrap();
1179        assert!(entity.is_some(), "BiDi override chars should be stripped");
1180    }
1181
1182    // ── Helper unit tests for sanitization functions ──────────────────────────
1183
1184    #[test]
1185    fn strip_control_chars_removes_ascii_controls() {
1186        assert_eq!(strip_control_chars("hello\x00world"), "helloworld");
1187        assert_eq!(strip_control_chars("tab\there"), "tabhere");
1188        assert_eq!(strip_control_chars("new\nline"), "newline");
1189    }
1190
1191    #[test]
1192    fn strip_control_chars_removes_bidi() {
1193        let bidi = "\u{202E}spoof";
1194        assert_eq!(strip_control_chars(bidi), "spoof");
1195    }
1196
1197    #[test]
1198    fn strip_control_chars_preserves_normal_unicode() {
1199        assert_eq!(strip_control_chars("привет мир"), "привет мир");
1200        assert_eq!(strip_control_chars("日本語"), "日本語");
1201    }
1202
1203    #[test]
1204    fn truncate_to_bytes_exact_boundary() {
1205        let s = "hello";
1206        assert_eq!(truncate_to_bytes(s, 5), "hello");
1207        assert_eq!(truncate_to_bytes(s, 3), "hel");
1208    }
1209
1210    #[test]
1211    fn truncate_to_bytes_respects_utf8_boundary() {
1212        // "é" is 2 bytes in UTF-8 — truncating at 1 byte should give ""
1213        let s = "élan";
1214        let truncated = truncate_to_bytes(s, 1);
1215        assert!(s.is_char_boundary(truncated.len()));
1216    }
1217
1218    // ── New tests: embedding-based resolution ─────────────────────────────────
1219
1220    #[tokio::test]
1221    async fn resolve_with_embedding_store_score_above_threshold_merges() {
1222        let (gs, emb) = setup_with_embedding().await;
1223        // Pre-insert an existing entity (different name to avoid exact match).
1224        // "python programming lang" is in Qdrant; we resolve "python scripting lang"
1225        // which embeds to the identical vector → cosine similarity = 1.0 > 0.85 → merge.
1226        let existing_id = gs
1227            .upsert_entity(
1228                "python programming lang",
1229                "python programming lang",
1230                EntityType::Language,
1231                Some("a programming language"),
1232            )
1233            .await
1234            .unwrap();
1235
1236        let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1237        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1238            .await
1239            .unwrap();
1240        let payload = serde_json::json!({
1241            "entity_id": existing_id,
1242            "name": "python programming lang",
1243            "entity_type": "language",
1244            "summary": "a programming language",
1245        });
1246        let point_id = emb
1247            .store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1248            .await
1249            .unwrap();
1250        gs.set_entity_qdrant_point_id(existing_id, &point_id)
1251            .await
1252            .unwrap();
1253
1254        // Mock provider returns the same vector for any text → cosine similarity = 1.0 > 0.85
1255        let provider = make_mock_provider_with_embedding(mock_vec);
1256        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1257
1258        let resolver = EntityResolver::new(&gs)
1259            .with_embedding_store(&emb)
1260            .with_provider(&any_provider)
1261            .with_thresholds(0.85, 0.70);
1262
1263        // Resolve a different name — no exact match, embedding match wins
1264        let (id, outcome) = resolver
1265            .resolve(
1266                "python scripting lang",
1267                "language",
1268                Some("scripting language"),
1269            )
1270            .await
1271            .unwrap();
1272
1273        assert_eq!(id, existing_id, "should return existing entity ID on merge");
1274        assert!(
1275            matches!(outcome, ResolutionOutcome::EmbeddingMatch { score } if score > 0.85),
1276            "outcome should be EmbeddingMatch with score > 0.85, got {outcome:?}"
1277        );
1278    }
1279
1280    #[tokio::test]
1281    async fn resolve_with_embedding_store_score_below_ambiguous_creates_new() {
1282        let (gs, emb) = setup_with_embedding().await;
1283        // Insert existing entity with orthogonal vector
1284        let existing_id = gs
1285            .upsert_entity("java", "java", EntityType::Language, Some("java language"))
1286            .await
1287            .unwrap();
1288
1289        // Existing uses [1,0,0,0]; new entity will embed to [0,1,0,0] (orthogonal, score=0)
1290        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1291            .await
1292            .unwrap();
1293        let payload = serde_json::json!({
1294            "entity_id": existing_id,
1295            "name": "java",
1296            "entity_type": "language",
1297            "summary": "java language",
1298        });
1299        emb.store_to_collection(ENTITY_COLLECTION, payload, vec![1.0, 0.0, 0.0, 0.0])
1300            .await
1301            .unwrap();
1302
1303        // Mock returns orthogonal vector → score = 0.0 < 0.70
1304        let provider = make_mock_provider_with_embedding(vec![0.0, 1.0, 0.0, 0.0]);
1305        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1306
1307        let resolver = EntityResolver::new(&gs)
1308            .with_embedding_store(&emb)
1309            .with_provider(&any_provider)
1310            .with_thresholds(0.85, 0.70);
1311
1312        let (id, outcome) = resolver
1313            .resolve("kotlin", "language", Some("kotlin language"))
1314            .await
1315            .unwrap();
1316
1317        assert_ne!(id, existing_id, "orthogonal entity should create new");
1318        assert_eq!(outcome, ResolutionOutcome::Created);
1319    }
1320
1321    #[tokio::test]
1322    async fn resolve_with_embedding_failure_falls_back_to_create() {
1323        // Use a mock with supports_embeddings=false — embed() returns EmbedUnsupported error,
1324        // which triggers the fallback path (create new entity).
1325        let sqlite2 = SqliteStore::new(":memory:").await.unwrap();
1326        let pool2 = sqlite2.pool().clone();
1327        let mem2 = Box::new(InMemoryVectorStore::new());
1328        let emb2 = Arc::new(EmbeddingStore::with_store(mem2, pool2));
1329        let gs2 = GraphStore::new(sqlite2.pool().clone());
1330
1331        let mut mock = zeph_llm::mock::MockProvider::default();
1332        mock.supports_embeddings = false;
1333        let any_provider = zeph_llm::any::AnyProvider::Mock(mock);
1334
1335        let resolver = EntityResolver::new(&gs2)
1336            .with_embedding_store(&emb2)
1337            .with_provider(&any_provider);
1338
1339        let (id, outcome) = resolver
1340            .resolve("testentity", "concept", Some("summary"))
1341            .await
1342            .unwrap();
1343        assert!(id > 0);
1344        assert_eq!(outcome, ResolutionOutcome::Created);
1345    }
1346
1347    #[tokio::test]
1348    async fn resolve_fallback_increments_counter() {
1349        let (gs, emb) = setup_with_embedding().await;
1350
1351        // Provider with embed that fails (supports_embeddings=false → EmbedUnsupported error)
1352        let mut mock = zeph_llm::mock::MockProvider::default();
1353        mock.supports_embeddings = false;
1354        let any_provider = zeph_llm::any::AnyProvider::Mock(mock);
1355
1356        let resolver = EntityResolver::new(&gs)
1357            .with_embedding_store(&emb)
1358            .with_provider(&any_provider);
1359
1360        let fallback_count = resolver.fallback_count();
1361
1362        // First call: embed fails → fallback
1363        resolver.resolve("entity_a", "concept", None).await.unwrap();
1364
1365        assert_eq!(
1366            fallback_count.load(std::sync::atomic::Ordering::Relaxed),
1367            1,
1368            "fallback counter should be 1 after embed failure"
1369        );
1370    }
1371
1372    #[tokio::test]
1373    async fn resolve_batch_processes_multiple_entities() {
1374        let gs = setup().await;
1375        let resolver = EntityResolver::new(&gs);
1376
1377        let entities = vec![
1378            ExtractedEntity {
1379                name: "rust".into(),
1380                entity_type: "language".into(),
1381                summary: Some("systems language".into()),
1382            },
1383            ExtractedEntity {
1384                name: "python".into(),
1385                entity_type: "language".into(),
1386                summary: None,
1387            },
1388            ExtractedEntity {
1389                name: "cargo".into(),
1390                entity_type: "tool".into(),
1391                summary: Some("rust build tool".into()),
1392            },
1393        ];
1394
1395        let results = resolver.resolve_batch(&entities).await.unwrap();
1396        assert_eq!(results.len(), 3);
1397        for (id, outcome) in &results {
1398            assert!(*id > 0);
1399            assert_eq!(*outcome, ResolutionOutcome::Created);
1400        }
1401    }
1402
1403    #[tokio::test]
1404    async fn resolve_batch_empty_returns_empty() {
1405        let gs = setup().await;
1406        let resolver = EntityResolver::new(&gs);
1407        let results = resolver.resolve_batch(&[]).await.unwrap();
1408        assert!(results.is_empty());
1409    }
1410
1411    #[tokio::test]
1412    async fn merge_combines_summaries() {
1413        let (gs, emb) = setup_with_embedding().await;
1414        // Use a different name for the existing entity so exact match doesn't trigger.
1415        // "mergetest v1" is stored with embedding; we then resolve "mergetest v2" which
1416        // embeds to the same vector → similarity = 1.0 > threshold → merge.
1417        let existing_id = gs
1418            .upsert_entity(
1419                "mergetest v1",
1420                "mergetest v1",
1421                EntityType::Concept,
1422                Some("first summary"),
1423            )
1424            .await
1425            .unwrap();
1426
1427        let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1428        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1429            .await
1430            .unwrap();
1431        let payload = serde_json::json!({
1432            "entity_id": existing_id,
1433            "name": "mergetest v1",
1434            "entity_type": "concept",
1435            "summary": "first summary",
1436        });
1437        let point_id = emb
1438            .store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1439            .await
1440            .unwrap();
1441        gs.set_entity_qdrant_point_id(existing_id, &point_id)
1442            .await
1443            .unwrap();
1444
1445        let provider = make_mock_provider_with_embedding(mock_vec);
1446        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1447
1448        let resolver = EntityResolver::new(&gs)
1449            .with_embedding_store(&emb)
1450            .with_provider(&any_provider)
1451            .with_thresholds(0.85, 0.70);
1452
1453        // Resolve "mergetest v2" — no exact match, but embedding is identical → merge
1454        let (id, outcome) = resolver
1455            .resolve("mergetest v2", "concept", Some("second summary"))
1456            .await
1457            .unwrap();
1458
1459        assert_eq!(id, existing_id);
1460        assert!(matches!(outcome, ResolutionOutcome::EmbeddingMatch { .. }));
1461
1462        // Verify the merged summary was updated on the existing entity
1463        let entity = gs
1464            .find_entity("mergetest v1", EntityType::Concept)
1465            .await
1466            .unwrap()
1467            .unwrap();
1468        let summary = entity.summary.unwrap_or_default();
1469        assert!(
1470            summary.contains("first summary") && summary.contains("second summary"),
1471            "merged summary should contain both: got {summary:?}"
1472        );
1473    }
1474
1475    #[tokio::test]
1476    async fn merge_preserves_older_entity_id() {
1477        let (gs, emb) = setup_with_embedding().await;
1478        // "legacy entity" stored with embedding; "legacy entity variant" has same vector → merge
1479        let existing_id = gs
1480            .upsert_entity(
1481                "legacy entity",
1482                "legacy entity",
1483                EntityType::Concept,
1484                Some("old info"),
1485            )
1486            .await
1487            .unwrap();
1488
1489        let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1490        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1491            .await
1492            .unwrap();
1493        let payload = serde_json::json!({
1494            "entity_id": existing_id,
1495            "name": "legacy entity",
1496            "entity_type": "concept",
1497            "summary": "old info",
1498        });
1499        emb.store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1500            .await
1501            .unwrap();
1502
1503        let provider = make_mock_provider_with_embedding(mock_vec);
1504        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1505
1506        let resolver = EntityResolver::new(&gs)
1507            .with_embedding_store(&emb)
1508            .with_provider(&any_provider)
1509            .with_thresholds(0.85, 0.70);
1510
1511        let (returned_id, _) = resolver
1512            .resolve("legacy entity variant", "concept", Some("new info"))
1513            .await
1514            .unwrap();
1515
1516        assert_eq!(
1517            returned_id, existing_id,
1518            "older entity ID should be preserved on merge"
1519        );
1520    }
1521
1522    #[tokio::test]
1523    async fn entity_type_filter_prevents_cross_type_merge() {
1524        let (gs, emb) = setup_with_embedding().await;
1525
1526        // Insert a Person named "python"
1527        let person_id = gs
1528            .upsert_entity(
1529                "python",
1530                "python",
1531                EntityType::Person,
1532                Some("a person named python"),
1533            )
1534            .await
1535            .unwrap();
1536
1537        let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1538        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1539            .await
1540            .unwrap();
1541        let payload = serde_json::json!({
1542            "entity_id": person_id,
1543            "name": "python",
1544            "entity_type": "person",
1545            "summary": "a person named python",
1546        });
1547        emb.store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1548            .await
1549            .unwrap();
1550
1551        let provider = make_mock_provider_with_embedding(mock_vec);
1552        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1553
1554        let resolver = EntityResolver::new(&gs)
1555            .with_embedding_store(&emb)
1556            .with_provider(&any_provider)
1557            .with_thresholds(0.85, 0.70);
1558
1559        // Resolve "python" as Language — should NOT merge with the Person entity
1560        let (lang_id, outcome) = resolver
1561            .resolve("python", "language", Some("python language"))
1562            .await
1563            .unwrap();
1564
1565        // The entity_type filter should prevent merging person "python" with language "python"
1566        // Check: either created new or an exact match was found under language type
1567        assert_ne!(
1568            lang_id, person_id,
1569            "language entity should not merge with person entity"
1570        );
1571        // The entity_type filter causes no embedding candidate to survive the type filter,
1572        // so resolution falls back to creating a new entity.
1573        assert_eq!(outcome, ResolutionOutcome::Created);
1574    }
1575
1576    #[tokio::test]
1577    async fn custom_thresholds_respected() {
1578        let (gs, emb) = setup_with_embedding().await;
1579        // With a very high threshold (1.0), even identical vectors won't merge
1580        // (they'd score exactly 1.0 which is NOT > 1.0, so... let's use 0.5 threshold
1581        // and verify score below 0.5 creates new)
1582        let existing_id = gs
1583            .upsert_entity(
1584                "threshold_test",
1585                "threshold_test",
1586                EntityType::Concept,
1587                Some("base"),
1588            )
1589            .await
1590            .unwrap();
1591
1592        let existing_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1593        emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1594            .await
1595            .unwrap();
1596        let payload = serde_json::json!({
1597            "entity_id": existing_id,
1598            "name": "threshold_test",
1599            "entity_type": "concept",
1600            "summary": "base",
1601        });
1602        emb.store_to_collection(ENTITY_COLLECTION, payload, existing_vec)
1603            .await
1604            .unwrap();
1605
1606        // Orthogonal vector → score = 0.0
1607        let provider = make_mock_provider_with_embedding(vec![0.0, 1.0, 0.0, 0.0]);
1608        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1609
1610        // With thresholds 0.50/0.30, score=0 is below 0.30 → create new
1611        let resolver = EntityResolver::new(&gs)
1612            .with_embedding_store(&emb)
1613            .with_provider(&any_provider)
1614            .with_thresholds(0.50, 0.30);
1615
1616        let (id, outcome) = resolver
1617            .resolve("new_concept", "concept", Some("different"))
1618            .await
1619            .unwrap();
1620
1621        assert_ne!(id, existing_id);
1622        assert_eq!(outcome, ResolutionOutcome::Created);
1623    }
1624
1625    #[tokio::test]
1626    async fn resolve_outcome_exact_match_no_embedding_store() {
1627        let gs = setup().await;
1628        let resolver = EntityResolver::new(&gs);
1629
1630        resolver.resolve("existing", "concept", None).await.unwrap();
1631        let (_, outcome) = resolver.resolve("existing", "concept", None).await.unwrap();
1632        assert_eq!(outcome, ResolutionOutcome::ExactMatch);
1633    }
1634
1635    #[tokio::test]
1636    async fn extract_json_strips_markdown_fences() {
1637        let with_fence = "```json\n{\"same_entity\": true}\n```";
1638        let extracted = extract_json(with_fence);
1639        let parsed: DisambiguationResponse = serde_json::from_str(extracted).unwrap();
1640        assert!(parsed.same_entity);
1641
1642        let without_fence = "{\"same_entity\": false}";
1643        let extracted2 = extract_json(without_fence);
1644        let parsed2: DisambiguationResponse = serde_json::from_str(extracted2).unwrap();
1645        assert!(!parsed2.same_entity);
1646    }
1647
1648    // Helper: build a MockProvider with embeddings enabled, given vector, and queued chat responses.
1649    fn make_mock_with_embedding_and_chat(
1650        embedding: Vec<f32>,
1651        chat_responses: Vec<String>,
1652    ) -> zeph_llm::mock::MockProvider {
1653        let mut p = zeph_llm::mock::MockProvider::with_responses(chat_responses);
1654        p.embedding = embedding;
1655        p.supports_embeddings = true;
1656        p
1657    }
1658
1659    // Seed an existing entity into both SQLite and InMemoryVectorStore at a known vector.
1660    async fn seed_entity_with_vector(
1661        gs: &GraphStore,
1662        emb: &Arc<EmbeddingStore>,
1663        name: &str,
1664        entity_type: EntityType,
1665        summary: &str,
1666        vector: Vec<f32>,
1667    ) -> i64 {
1668        let id = gs
1669            .upsert_entity(name, name, entity_type, Some(summary))
1670            .await
1671            .unwrap();
1672        emb.ensure_named_collection(ENTITY_COLLECTION, u64::try_from(vector.len()).unwrap())
1673            .await
1674            .unwrap();
1675        let payload = serde_json::json!({
1676            "entity_id": id,
1677            "name": name,
1678            "entity_type": entity_type.as_str(),
1679            "summary": summary,
1680        });
1681        let point_id = emb
1682            .store_to_collection(ENTITY_COLLECTION, payload, vector)
1683            .await
1684            .unwrap();
1685        gs.set_entity_qdrant_point_id(id, &point_id).await.unwrap();
1686        id
1687    }
1688
1689    // ── GAP-1: ambiguous score + LLM says same_entity=true → LlmDisambiguated ─
1690
1691    #[tokio::test]
1692    async fn resolve_ambiguous_score_llm_says_merge() {
1693        // existing entity at [1,0,0,0]; new entity embeds to [1,1,0,0] → cosine ≈ 0.707
1694        // thresholds: similarity=0.85, ambiguous=0.50 → score 0.707 is in [0.50, 0.85)
1695        let (gs, emb) = setup_with_embedding().await;
1696        let existing_id = seed_entity_with_vector(
1697            &gs,
1698            &emb,
1699            "goroutine",
1700            EntityType::Concept,
1701            "go concurrency primitive",
1702            vec![1.0, 0.0, 0.0, 0.0],
1703        )
1704        .await;
1705
1706        // LLM responds with same_entity=true → should merge
1707        let provider = make_mock_with_embedding_and_chat(
1708            vec![1.0, 1.0, 0.0, 0.0],
1709            vec![r#"{"same_entity": true}"#.to_owned()],
1710        );
1711        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1712
1713        let resolver = EntityResolver::new(&gs)
1714            .with_embedding_store(&emb)
1715            .with_provider(&any_provider)
1716            .with_thresholds(0.85, 0.50);
1717
1718        let (id, outcome) = resolver
1719            .resolve("goroutine concurrency", "concept", Some("go concurrency"))
1720            .await
1721            .unwrap();
1722
1723        assert_eq!(
1724            id, existing_id,
1725            "should return existing entity ID on LLM merge"
1726        );
1727        assert_eq!(outcome, ResolutionOutcome::LlmDisambiguated);
1728    }
1729
1730    // ── GAP-2: ambiguous score + LLM says same_entity=false → Created ──────────
1731
1732    #[tokio::test]
1733    async fn resolve_ambiguous_score_llm_says_different() {
1734        let (gs, emb) = setup_with_embedding().await;
1735        let existing_id = seed_entity_with_vector(
1736            &gs,
1737            &emb,
1738            "channel",
1739            EntityType::Concept,
1740            "go channel",
1741            vec![1.0, 0.0, 0.0, 0.0],
1742        )
1743        .await;
1744
1745        // LLM responds with same_entity=false → should create new entity
1746        let provider = make_mock_with_embedding_and_chat(
1747            vec![1.0, 1.0, 0.0, 0.0],
1748            vec![r#"{"same_entity": false}"#.to_owned()],
1749        );
1750        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1751
1752        let resolver = EntityResolver::new(&gs)
1753            .with_embedding_store(&emb)
1754            .with_provider(&any_provider)
1755            .with_thresholds(0.85, 0.50);
1756
1757        let (id, outcome) = resolver
1758            .resolve("network channel", "concept", Some("networking channel"))
1759            .await
1760            .unwrap();
1761
1762        assert_ne!(
1763            id, existing_id,
1764            "LLM-rejected match should create new entity"
1765        );
1766        assert_eq!(outcome, ResolutionOutcome::Created);
1767    }
1768
1769    // ── GAP-3: ambiguous score + LLM chat fails → fallback counter incremented ─
1770
1771    #[tokio::test]
1772    async fn resolve_ambiguous_score_llm_failure_increments_fallback() {
1773        let (gs, emb) = setup_with_embedding().await;
1774        let existing_id = seed_entity_with_vector(
1775            &gs,
1776            &emb,
1777            "mutex",
1778            EntityType::Concept,
1779            "mutual exclusion lock",
1780            vec![1.0, 0.0, 0.0, 0.0],
1781        )
1782        .await;
1783
1784        // fail_chat=true → provider.chat() returns Err → None from llm_disambiguate
1785        let mut provider = make_mock_with_embedding_and_chat(vec![1.0, 1.0, 0.0, 0.0], vec![]);
1786        provider.fail_chat = true;
1787        let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1788
1789        let resolver = EntityResolver::new(&gs)
1790            .with_embedding_store(&emb)
1791            .with_provider(&any_provider)
1792            .with_thresholds(0.85, 0.50);
1793
1794        let fallback_count = resolver.fallback_count();
1795
1796        let (id, outcome) = resolver
1797            .resolve("mutex lock", "concept", Some("synchronization primitive"))
1798            .await
1799            .unwrap();
1800
1801        // LLM failure → fallback to create new
1802        assert_ne!(
1803            id, existing_id,
1804            "LLM failure should create new entity (fallback)"
1805        );
1806        assert_eq!(outcome, ResolutionOutcome::Created);
1807        assert_eq!(
1808            fallback_count.load(std::sync::atomic::Ordering::Relaxed),
1809            1,
1810            "fallback counter should be incremented on LLM chat failure"
1811        );
1812    }
1813
1814    // ── Canonicalization / alias tests ────────────────────────────────────────
1815
1816    #[tokio::test]
1817    async fn resolve_creates_entity_with_canonical_name() {
1818        let gs = setup().await;
1819        let resolver = EntityResolver::new(&gs);
1820        let (id, _) = resolver.resolve("Rust", "language", None).await.unwrap();
1821        assert!(id > 0);
1822        let entity = gs
1823            .find_entity("rust", EntityType::Language)
1824            .await
1825            .unwrap()
1826            .unwrap();
1827        assert_eq!(entity.canonical_name, "rust");
1828    }
1829
1830    #[tokio::test]
1831    async fn resolve_adds_alias_on_create() {
1832        let gs = setup().await;
1833        let resolver = EntityResolver::new(&gs);
1834        let (id, _) = resolver.resolve("Rust", "language", None).await.unwrap();
1835        let aliases = gs.aliases_for_entity(id).await.unwrap();
1836        assert!(
1837            !aliases.is_empty(),
1838            "new entity should have at least one alias"
1839        );
1840        assert!(aliases.iter().any(|a| a.alias_name == "rust"));
1841    }
1842
1843    #[tokio::test]
1844    async fn resolve_reuses_entity_by_alias() {
1845        let gs = setup().await;
1846        let resolver = EntityResolver::new(&gs);
1847
1848        // Create entity and register an alias
1849        let (id1, _) = resolver.resolve("rust", "language", None).await.unwrap();
1850        gs.add_alias(id1, "rust-lang").await.unwrap();
1851
1852        // Resolve using the alias — should return the same entity
1853        let (id2, _) = resolver
1854            .resolve("rust-lang", "language", None)
1855            .await
1856            .unwrap();
1857        assert_eq!(
1858            id1, id2,
1859            "'rust-lang' alias should resolve to same entity as 'rust'"
1860        );
1861    }
1862
1863    #[tokio::test]
1864    async fn resolve_alias_match_respects_entity_type() {
1865        let gs = setup().await;
1866        let resolver = EntityResolver::new(&gs);
1867
1868        // "python" as a Language
1869        let (lang_id, _) = resolver.resolve("python", "language", None).await.unwrap();
1870
1871        // "python" as a Tool should create a separate entity (different type)
1872        let (tool_id, _) = resolver.resolve("python", "tool", None).await.unwrap();
1873        assert_ne!(
1874            lang_id, tool_id,
1875            "same name with different type should be separate entities"
1876        );
1877    }
1878
1879    #[tokio::test]
1880    async fn resolve_preserves_existing_aliases() {
1881        let gs = setup().await;
1882        let resolver = EntityResolver::new(&gs);
1883
1884        let (id, _) = resolver.resolve("rust", "language", None).await.unwrap();
1885        gs.add_alias(id, "rust-lang").await.unwrap();
1886
1887        // Upserting same entity should not remove prior aliases
1888        resolver
1889            .resolve("rust", "language", Some("updated"))
1890            .await
1891            .unwrap();
1892        let aliases = gs.aliases_for_entity(id).await.unwrap();
1893        assert!(
1894            aliases.iter().any(|a| a.alias_name == "rust-lang"),
1895            "prior alias must be preserved"
1896        );
1897    }
1898
1899    #[tokio::test]
1900    async fn resolve_original_form_registered_as_alias() {
1901        let gs = setup().await;
1902        let resolver = EntityResolver::new(&gs);
1903
1904        // "  Rust  " — original trimmed lowercased form is "rust", same as normalized
1905        // So only one alias should be registered (no duplicate)
1906        let (id, _) = resolver
1907            .resolve("  Rust  ", "language", None)
1908            .await
1909            .unwrap();
1910        let aliases = gs.aliases_for_entity(id).await.unwrap();
1911        assert!(aliases.iter().any(|a| a.alias_name == "rust"));
1912    }
1913
1914    #[tokio::test]
1915    async fn resolve_entity_with_many_aliases() {
1916        let gs = setup().await;
1917        let id = gs
1918            .upsert_entity("bigentity", "bigentity", EntityType::Concept, None)
1919            .await
1920            .unwrap();
1921        for i in 0..100 {
1922            gs.add_alias(id, &format!("alias-{i}")).await.unwrap();
1923        }
1924        let aliases = gs.aliases_for_entity(id).await.unwrap();
1925        assert_eq!(aliases.len(), 100);
1926
1927        // Fuzzy search should still work via alias
1928        let results = gs.find_entities_fuzzy("alias-50", 10).await.unwrap();
1929        assert!(results.iter().any(|e| e.id == id));
1930    }
1931}