Skip to main content

zeph_memory/graph/resolver/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::sync::Arc;
5
6use dashmap::DashMap;
7use futures::stream::{self, StreamExt as _};
8use schemars::JsonSchema;
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use zeph_common::sanitize::strip_control_chars;
12use zeph_common::text::truncate_to_bytes_ref;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{LlmProvider as _, Message, Role};
15
16use super::store::GraphStore;
17use super::types::EntityType;
18use crate::embedding_store::EmbeddingStore;
19use crate::error::MemoryError;
20use crate::graph::extractor::ExtractedEntity;
21use crate::types::MessageId;
22use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
23
24/// Minimum byte length for entity names — rejects noise tokens like "go", "cd".
25const MIN_ENTITY_NAME_BYTES: usize = 3;
26/// Maximum byte length for entity names stored in the graph.
27const MAX_ENTITY_NAME_BYTES: usize = 512;
28/// Maximum byte length for relation strings.
29const MAX_RELATION_BYTES: usize = 256;
30/// Maximum byte length for fact strings.
31const MAX_FACT_BYTES: usize = 2048;
32
33/// Qdrant collection for entity embeddings.
34const ENTITY_COLLECTION: &str = "zeph_graph_entities";
35
36/// Timeout for a single `embed()` call in seconds.
37const EMBED_TIMEOUT_SECS: u64 = 30;
38
39/// Outcome of an entity resolution attempt.
40#[derive(Debug, Clone, PartialEq)]
41pub enum ResolutionOutcome {
42    /// Exact name+type match in `SQLite`.
43    ExactMatch,
44    /// Cosine similarity >= merge threshold; score is the cosine similarity value.
45    EmbeddingMatch { score: f32 },
46    /// LLM confirmed merge in ambiguous similarity range.
47    LlmDisambiguated,
48    /// New entity was created.
49    Created,
50}
51
52/// LLM response for entity disambiguation.
53#[derive(Debug, Deserialize, JsonSchema)]
54struct DisambiguationResponse {
55    same_entity: bool,
56}
57
58/// Per-entity-name lock guard to prevent concurrent duplicate creation.
59///
60/// Keyed by normalized entity name. Entities with different names resolve concurrently;
61/// entities with the same name are serialized.
62///
63/// TODO(SEC-M33-02): This map grows unboundedly — one entry per unique normalized name.
64/// For a short-lived resolver this is acceptable. If the resolver becomes long-lived
65/// (stored in `SemanticMemory`), add eviction or use a fixed-size sharded lock array.
66type NameLockMap = Arc<DashMap<String, Arc<Mutex<()>>>>;
67
68pub struct EntityResolver<'a> {
69    store: &'a GraphStore,
70    embedding_store: Option<&'a Arc<EmbeddingStore>>,
71    provider: Option<&'a AnyProvider>,
72    similarity_threshold: f32,
73    ambiguous_threshold: f32,
74    name_locks: NameLockMap,
75    /// Counter for error-triggered fallbacks (embed/LLM failures). Tests can read this via Arc.
76    fallback_count: Arc<std::sync::atomic::AtomicU64>,
77    /// Ensures `ensure_named_collection()` is called at most once per resolver lifetime.
78    ///
79    /// Arc-wrapped so future clones or spawned tasks can share the same gate.
80    collection_ensured: Arc<tokio::sync::OnceCell<()>>,
81}
82
83impl<'a> EntityResolver<'a> {
84    /// Returns a reference to the underlying graph store.
85    #[must_use]
86    pub fn graph_store(&self) -> &GraphStore {
87        self.store
88    }
89
90    #[must_use]
91    pub fn new(store: &'a GraphStore) -> Self {
92        Self {
93            store,
94            embedding_store: None,
95            provider: None,
96            similarity_threshold: 0.85,
97            ambiguous_threshold: 0.70,
98            name_locks: Arc::new(DashMap::new()),
99            fallback_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
100            collection_ensured: Arc::new(tokio::sync::OnceCell::new()),
101        }
102    }
103
104    #[must_use]
105    pub fn with_embedding_store(mut self, store: &'a Arc<EmbeddingStore>) -> Self {
106        self.embedding_store = Some(store);
107        self
108    }
109
110    #[must_use]
111    pub fn with_provider(mut self, provider: &'a AnyProvider) -> Self {
112        self.provider = Some(provider);
113        self
114    }
115
116    #[must_use]
117    pub fn with_thresholds(mut self, similarity: f32, ambiguous: f32) -> Self {
118        self.similarity_threshold = similarity;
119        self.ambiguous_threshold = ambiguous;
120        self
121    }
122
123    /// Shared fallback counter — tests can clone this Arc to inspect the value.
124    #[must_use]
125    pub fn fallback_count(&self) -> Arc<std::sync::atomic::AtomicU64> {
126        Arc::clone(&self.fallback_count)
127    }
128
129    /// Normalize an entity name: trim, lowercase, strip control chars, truncate.
130    fn normalize_name(name: &str) -> String {
131        let lowered = name.trim().to_lowercase();
132        let cleaned = strip_control_chars(&lowered);
133        let normalized = truncate_to_bytes_ref(&cleaned, MAX_ENTITY_NAME_BYTES).to_owned();
134        if normalized.len() < cleaned.len() {
135            tracing::debug!(
136                "graph resolver: entity name truncated to {} bytes",
137                MAX_ENTITY_NAME_BYTES
138            );
139        }
140        normalized
141    }
142
143    /// Parse an entity type string, falling back to `Concept` on unknown values.
144    fn parse_entity_type(entity_type: &str) -> EntityType {
145        entity_type
146            .trim()
147            .to_lowercase()
148            .parse::<EntityType>()
149            .unwrap_or_else(|_| {
150                tracing::debug!(
151                    "graph resolver: unknown entity type {:?}, falling back to Concept",
152                    entity_type
153                );
154                EntityType::Concept
155            })
156    }
157
158    /// Acquire the per-name lock and return the guard. Keeps lock alive for the caller.
159    async fn lock_name(&self, normalized: &str) -> tokio::sync::OwnedMutexGuard<()> {
160        let lock = self
161            .name_locks
162            .entry(normalized.to_owned())
163            .or_insert_with(|| Arc::new(Mutex::new(())))
164            .clone();
165        lock.lock_owned().await
166    }
167
168    /// Resolve an extracted entity using the alias-first canonicalization pipeline.
169    ///
170    /// Pipeline:
171    /// 1. Normalize: trim, lowercase, strip control chars, truncate to 512 bytes.
172    /// 2. Parse entity type (fallback to Concept on unknown).
173    /// 3. Alias lookup: search `graph_entity_aliases` by normalized name + `entity_type`.
174    ///    If found, touch `last_seen_at` and return the existing entity id.
175    /// 4. Canonical name lookup: search `graph_entities` by `canonical_name` + `entity_type`.
176    ///    If found, touch `last_seen_at` and return the existing entity id.
177    /// 5. When `embedding_store` and `provider` are configured, performs embedding-based fuzzy
178    ///    matching: cosine similarity search (Qdrant), LLM disambiguation for ambiguous range,
179    ///    merge or create based on result. Failures degrade gracefully to step 6.
180    /// 6. Create: upsert new entity with `canonical_name` = normalized name.
181    /// 7. Register the normalized form (and original trimmed form if different) as aliases.
182    ///
183    /// # Errors
184    ///
185    /// Returns an error if the entity name is empty after normalization, or if a DB operation fails.
186    pub async fn resolve(
187        &self,
188        name: &str,
189        entity_type: &str,
190        summary: Option<&str>,
191    ) -> Result<(i64, ResolutionOutcome), MemoryError> {
192        let normalized = Self::normalize_name(name);
193
194        if normalized.is_empty() {
195            return Err(MemoryError::GraphStore("empty entity name".into()));
196        }
197
198        if normalized.len() < MIN_ENTITY_NAME_BYTES {
199            return Err(MemoryError::GraphStore(format!(
200                "entity name too short: {normalized:?} ({} bytes, min {MIN_ENTITY_NAME_BYTES})",
201                normalized.len()
202            )));
203        }
204
205        let et = Self::parse_entity_type(entity_type);
206
207        // The surface form preserves the original casing for user-facing display.
208        let surface_name = name.trim().to_owned();
209
210        // Acquire per-name lock to prevent concurrent duplicate creation.
211        let _guard = self.lock_name(&normalized).await;
212
213        // Step 3: alias-first lookup (filters by entity_type to prevent cross-type collisions).
214        if let Some(entity) = self.store.find_entity_by_alias(&normalized, et).await? {
215            self.store
216                .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
217                .await?;
218            return Ok((entity.id.0, ResolutionOutcome::ExactMatch));
219        }
220
221        // Step 4: canonical name lookup.
222        if let Some(entity) = self.store.find_entity(&normalized, et).await? {
223            self.store
224                .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
225                .await?;
226            return Ok((entity.id.0, ResolutionOutcome::ExactMatch));
227        }
228
229        // Step 5: Embedding-based resolution (when configured).
230        if let Some(outcome) = self
231            .resolve_via_embedding(&normalized, name, &surface_name, et, summary)
232            .await?
233        {
234            return Ok(outcome);
235        }
236
237        // Step 6: Create new entity (no embedding store, or embedding failure).
238        let entity_id = self
239            .store
240            .upsert_entity(&surface_name, &normalized, et, summary)
241            .await?;
242
243        self.register_aliases(entity_id.0, &normalized, name)
244            .await?;
245
246        Ok((entity_id.0, ResolutionOutcome::Created))
247    }
248
249    /// Compute embedding for an entity, incrementing `fallback_count` on failure/timeout.
250    /// Returns `None` when embedding is unavailable (caller should skip vector operations).
251    async fn embed_entity_text(
252        &self,
253        provider: &AnyProvider,
254        normalized: &str,
255        summary: Option<&str>,
256    ) -> Option<Vec<f32>> {
257        let safe_summary = truncate_to_bytes_ref(summary.unwrap_or(""), MAX_FACT_BYTES);
258        let embed_text = format!("{normalized}: {safe_summary}");
259        let embed_result = tokio::time::timeout(
260            std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
261            provider.embed(&embed_text),
262        )
263        .await;
264        match embed_result {
265            Ok(Ok(v)) => Some(v),
266            Ok(Err(err)) => {
267                self.fallback_count
268                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
269                tracing::warn!(entity_name = %normalized, error = %err,
270                    "embed() failed; falling back to exact-match-only entity creation");
271                None
272            }
273            Err(_timeout) => {
274                self.fallback_count
275                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
276                tracing::warn!(entity_name = %normalized,
277                    "embed() timed out after {}s; falling back to create new entity",
278                    EMBED_TIMEOUT_SECS);
279                None
280            }
281        }
282    }
283
284    /// Handle a candidate in the ambiguous score range by running LLM disambiguation.
285    /// Returns `Ok(Some(...))` if the LLM confirms a match, `Ok(None)` to fall through to create.
286    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
287    async fn handle_ambiguous_candidate(
288        &self,
289        emb_store: &EmbeddingStore,
290        provider: &AnyProvider,
291        payload: &std::collections::HashMap<String, serde_json::Value>,
292        point_id: &str,
293        score: f32,
294        surface_name: &str,
295        normalized: &str,
296        et: EntityType,
297        summary: Option<&str>,
298    ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
299        let entity_id = payload
300            .get("entity_id")
301            .and_then(serde_json::Value::as_i64)
302            .ok_or_else(|| MemoryError::GraphStore("missing entity_id in payload".into()))?;
303        let existing_name = payload
304            .get("name")
305            .and_then(|v| v.as_str())
306            .unwrap_or("")
307            .to_owned();
308        let existing_summary = payload
309            .get("summary")
310            .and_then(|v| v.as_str())
311            .unwrap_or("")
312            .to_owned();
313        // Use the existing entity's actual type from the payload (IC-S3)
314        let existing_type = payload
315            .get("entity_type")
316            .and_then(|v| v.as_str())
317            .unwrap_or(et.as_str())
318            .to_owned();
319        let existing_canonical = payload.get("canonical_name").and_then(|v| v.as_str());
320        let existing_summary_str = payload.get("summary").and_then(|v| v.as_str());
321        match self
322            .llm_disambiguate(
323                provider,
324                normalized,
325                et.as_str(),
326                summary.unwrap_or(""),
327                &existing_name,
328                &existing_type,
329                &existing_summary,
330                score,
331            )
332            .await
333        {
334            Some(true) => {
335                self.merge_entity(
336                    emb_store,
337                    provider,
338                    entity_id,
339                    surface_name,
340                    normalized,
341                    et,
342                    summary,
343                    existing_canonical,
344                    existing_summary_str,
345                    Some(point_id),
346                )
347                .await?;
348                Ok(Some((entity_id, ResolutionOutcome::LlmDisambiguated)))
349            }
350            Some(false) => Ok(None),
351            None => {
352                self.fallback_count
353                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
354                tracing::warn!(entity_name = %normalized,
355                    "LLM disambiguation failed; falling back to create new entity");
356                Ok(None)
357            }
358        }
359    }
360
361    /// Attempt embedding-based resolution. Returns `Ok(Some(...))` if resolved (early return),
362    /// `Ok(None)` if no match found (caller should fall through to create), or `Err` on DB error.
363    async fn resolve_via_embedding(
364        &self,
365        normalized: &str,
366        original_name: &str,
367        surface_name: &str,
368        et: EntityType,
369        summary: Option<&str>,
370    ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
371        let (Some(emb_store), Some(provider)) = (self.embedding_store, self.provider) else {
372            return Ok(None);
373        };
374
375        let Some(query_vec) = self.embed_entity_text(provider, normalized, summary).await else {
376            return Ok(None);
377        };
378
379        let type_filter = VectorFilter {
380            must: vec![FieldCondition {
381                field: "entity_type".into(),
382                value: FieldValue::Text(et.as_str().to_owned()),
383            }],
384            must_not: vec![],
385        };
386        let candidates = match emb_store
387            .search_collection(ENTITY_COLLECTION, &query_vec, 5, Some(type_filter))
388            .await
389        {
390            Ok(c) => c,
391            Err(err) => {
392                self.fallback_count
393                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
394                tracing::warn!(entity_name = %normalized, error = %err,
395                    "Qdrant search failed; falling back to create new entity");
396                return self
397                    .create_with_embedding(
398                        emb_store,
399                        surface_name,
400                        normalized,
401                        original_name,
402                        et,
403                        summary,
404                        &query_vec,
405                    )
406                    .await
407                    .map(Some);
408            }
409        };
410
411        if let Some(best) = candidates.first() {
412            let score = best.score;
413            if score >= self.similarity_threshold {
414                let entity_id = best
415                    .payload
416                    .get("entity_id")
417                    .and_then(serde_json::Value::as_i64)
418                    .ok_or_else(|| {
419                        MemoryError::GraphStore("missing entity_id in payload".into())
420                    })?;
421                let existing_canonical =
422                    best.payload.get("canonical_name").and_then(|v| v.as_str());
423                let existing_summary = best.payload.get("summary").and_then(|v| v.as_str());
424                let existing_pid = Some(best.id.as_str());
425                self.merge_entity(
426                    emb_store,
427                    provider,
428                    entity_id,
429                    surface_name,
430                    normalized,
431                    et,
432                    summary,
433                    existing_canonical,
434                    existing_summary,
435                    existing_pid,
436                )
437                .await?;
438                return Ok(Some((
439                    entity_id,
440                    ResolutionOutcome::EmbeddingMatch { score },
441                )));
442            } else if score >= self.ambiguous_threshold
443                && let Some(result) = self
444                    .handle_ambiguous_candidate(
445                        emb_store,
446                        provider,
447                        &best.payload,
448                        &best.id,
449                        score,
450                        surface_name,
451                        normalized,
452                        et,
453                        summary,
454                    )
455                    .await?
456            {
457                return Ok(Some(result));
458            }
459            // score < ambiguous_threshold or LLM said different: fall through to create with embedding
460        }
461
462        // No suitable match — create new entity and store embedding.
463        self.create_with_embedding(
464            emb_store,
465            surface_name,
466            normalized,
467            original_name,
468            et,
469            summary,
470            &query_vec,
471        )
472        .await
473        .map(Some)
474    }
475
476    /// Create a new entity, register aliases, and store its embedding in Qdrant.
477    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
478    async fn create_with_embedding(
479        &self,
480        emb_store: &EmbeddingStore,
481        surface_name: &str,
482        normalized: &str,
483        original_name: &str,
484        et: EntityType,
485        summary: Option<&str>,
486        query_vec: &[f32],
487    ) -> Result<(i64, ResolutionOutcome), MemoryError> {
488        let entity_id = self
489            .store
490            .upsert_entity(surface_name, normalized, et, summary)
491            .await?;
492        self.register_aliases(entity_id.0, normalized, original_name)
493            .await?;
494        self.store_entity_embedding(
495            emb_store,
496            entity_id.0,
497            None,
498            normalized,
499            et,
500            summary.unwrap_or(""),
501            query_vec,
502        )
503        .await;
504        Ok((entity_id.0, ResolutionOutcome::Created))
505    }
506
507    /// Register the normalized form and original trimmed form as aliases for an entity.
508    async fn register_aliases(
509        &self,
510        entity_id: i64,
511        normalized: &str,
512        original_name: &str,
513    ) -> Result<(), MemoryError> {
514        self.store.add_alias(entity_id, normalized).await?;
515
516        // Also register the original trimmed lowercased form if it differs from normalized
517        // (e.g. when control chars were stripped, leaving a shorter string).
518        let original_trimmed = original_name.trim().to_lowercase();
519        let original_clean_str = strip_control_chars(&original_trimmed);
520        let original_clean = truncate_to_bytes_ref(&original_clean_str, MAX_ENTITY_NAME_BYTES);
521        if original_clean != normalized {
522            self.store.add_alias(entity_id, original_clean).await?;
523        }
524
525        Ok(())
526    }
527
528    /// Merge an existing entity with new information: combine summaries, update Qdrant.
529    ///
530    /// `existing_canonical_name` and `existing_summary` are read from the Qdrant payload at
531    /// the call site (hot path, no `SQLite` roundtrip). Pass `None` for either when the point
532    /// predates the payload fields — a targeted `find_entity_by_id` read is then used as a
533    /// one-time fallback (legacy transition path, removed after all points are rewritten).
534    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
535    async fn merge_entity(
536        &self,
537        emb_store: &EmbeddingStore,
538        provider: &AnyProvider,
539        entity_id: i64,
540        new_surface_name: &str,
541        new_canonical_name: &str,
542        entity_type: EntityType,
543        new_summary: Option<&str>,
544        existing_canonical_name: Option<&str>,
545        existing_summary_payload: Option<&str>,
546        existing_point_id: Option<&str>,
547    ) -> Result<(), MemoryError> {
548        // Hot path: use values from the Qdrant payload (no SQLite roundtrip).
549        // Both canonical_name AND summary must be present; if either is absent the point
550        // predates the payload field — fall back to a targeted SQLite read to avoid
551        // silently dropping a summary that was written outside merge_entity.
552        let (existing_canonical, existing_summary, existing_point_id_owned) =
553            if existing_canonical_name.is_some() && existing_summary_payload.is_some() {
554                (
555                    existing_canonical_name
556                        .unwrap_or(new_canonical_name)
557                        .to_owned(),
558                    existing_summary_payload.unwrap_or("").to_owned(),
559                    existing_point_id.map(ToOwned::to_owned),
560                )
561            } else {
562                // Transition-period fallback. Legacy Qdrant points pre-date the payload
563                // fields; one targeted read is acceptable until the embedding is rewritten
564                // on the next merge. Also used when canonical_name is present but summary
565                // is absent — prevents overwriting a SQLite summary with empty string.
566                let existing = self.store.find_entity_by_id(entity_id).await?;
567                let canonical = existing_canonical_name.map_or_else(
568                    || {
569                        existing.as_ref().map_or_else(
570                            || new_canonical_name.to_owned(),
571                            |e| e.canonical_name.clone(),
572                        )
573                    },
574                    ToOwned::to_owned,
575                );
576                let summary = existing
577                    .as_ref()
578                    .and_then(|e| e.summary.as_deref())
579                    .unwrap_or("")
580                    .to_owned();
581                let pid = existing_point_id.map(ToOwned::to_owned).or_else(|| {
582                    existing
583                        .as_ref()
584                        .and_then(|e| e.qdrant_point_id.as_deref())
585                        .map(ToOwned::to_owned)
586                });
587                (canonical, summary, pid)
588            };
589
590        let merged_summary = if let Some(new) = new_summary {
591            if !new.is_empty() && !existing_summary.is_empty() {
592                let combined = format!("{existing_summary}; {new}");
593                // TODO(S2): use LLM-based summary merge when summary exceeds 512 bytes
594                truncate_to_bytes_ref(&combined, MAX_FACT_BYTES).to_owned()
595            } else if !new.is_empty() {
596                new.to_owned()
597            } else {
598                existing_summary.clone()
599            }
600        } else {
601            existing_summary.clone()
602        };
603
604        let summary_opt = if merged_summary.is_empty() {
605            None
606        } else {
607            Some(merged_summary.as_str())
608        };
609
610        // Preserve the existing display name from the payload; fall back to the incoming surface
611        // name only for brand-new entities (where the payload had no "name" field).
612        self.store
613            .upsert_entity(
614                new_surface_name,
615                &existing_canonical,
616                entity_type,
617                summary_opt,
618            )
619            .await?;
620
621        // Re-embed merged text and upsert to Qdrant
622        let embed_text = format!("{new_surface_name}: {merged_summary}");
623        let embed_result = tokio::time::timeout(
624            std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
625            provider.embed(&embed_text),
626        )
627        .await;
628
629        match embed_result {
630            Ok(Ok(vec)) => {
631                self.store_entity_embedding(
632                    emb_store,
633                    entity_id,
634                    existing_point_id_owned.as_deref(),
635                    new_surface_name,
636                    entity_type,
637                    &merged_summary,
638                    &vec,
639                )
640                .await;
641            }
642            Ok(Err(err)) => {
643                tracing::warn!(
644                    entity_id,
645                    error = %err,
646                    "merge re-embed failed; Qdrant entry may be stale"
647                );
648            }
649            Err(_) => {
650                tracing::warn!(
651                    entity_id,
652                    "merge re-embed timed out; Qdrant entry may be stale"
653                );
654            }
655        }
656
657        Ok(())
658    }
659
660    /// Store an entity embedding in Qdrant and update `qdrant_point_id` in `SQLite`.
661    ///
662    /// When `existing_point_id` is `Some`, the existing Qdrant point is updated in-place
663    /// (upsert by ID) to avoid orphaned stale points. When `None`, a new point is created.
664    ///
665    /// Failures are logged at warn level but do not propagate — the entity is still
666    /// valid in `SQLite` even if Qdrant upsert fails.
667    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
668    async fn store_entity_embedding(
669        &self,
670        emb_store: &EmbeddingStore,
671        entity_id: i64,
672        existing_point_id: Option<&str>,
673        name: &str,
674        entity_type: EntityType,
675        summary: &str,
676        vector: &[f32],
677    ) {
678        // Ensure the Qdrant collection exists exactly once per resolver lifetime.
679        // All entity embeddings use the same model dimension, so the first call's
680        // vector_size wins — subsequent entities share the same collection.
681        // On error the cell stays unset, so the next entity retries — transient
682        // network failures do not permanently disable embedding storage.
683        let vector_size = u64::try_from(vector.len()).unwrap_or(384);
684        let collection_ensured = Arc::clone(&self.collection_ensured);
685        if let Err(err) = collection_ensured
686            .get_or_try_init(|| async {
687                emb_store
688                    .ensure_named_collection(ENTITY_COLLECTION, vector_size)
689                    .await
690            })
691            .await
692        {
693            tracing::error!(
694                error = %err,
695                "failed to ensure entity embedding collection; skipping Qdrant upsert"
696            );
697            return;
698        }
699
700        let payload = serde_json::json!({
701            "entity_id": entity_id,
702            // String mirror of entity_id for scroll_all enumeration (scroll_all only surfaces
703            // StringValue payload fields; the i64 entity_id is preserved for existing search
704            // consumers which read it directly from ScoredVectorPoint.payload).
705            "entity_id_str": entity_id.to_string(),
706            "canonical_name": name,
707            "name": name,
708            "entity_type": entity_type.as_str(),
709            "summary": summary,
710        });
711
712        if let Some(point_id) = existing_point_id {
713            // Reuse existing point to avoid orphaned stale points (IC-S1)
714            if let Err(err) = emb_store
715                .upsert_to_collection(ENTITY_COLLECTION, point_id, payload, vector.to_vec())
716                .await
717            {
718                tracing::warn!(
719                    entity_id,
720                    error = %err,
721                    "Qdrant upsert (existing point) failed; Qdrant entry may be stale"
722                );
723            }
724        } else {
725            match emb_store
726                .store_to_collection(ENTITY_COLLECTION, payload, vector.to_vec())
727                .await
728            {
729                Ok(point_id) => {
730                    if let Err(err) = self
731                        .store
732                        .set_entity_qdrant_point_id(entity_id, &point_id)
733                        .await
734                    {
735                        tracing::warn!(
736                            entity_id,
737                            error = %err,
738                            "failed to store qdrant_point_id in SQLite"
739                        );
740                    }
741                }
742                Err(err) => {
743                    tracing::warn!(
744                        entity_id,
745                        error = %err,
746                        "Qdrant upsert failed; entity created in SQLite, qdrant_point_id remains NULL"
747                    );
748                }
749            }
750        }
751    }
752
753    /// Ask the LLM whether two entities are the same.
754    ///
755    /// Returns `Some(true)` for merge, `Some(false)` for separate, `None` on failure.
756    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
757    async fn llm_disambiguate(
758        &self,
759        provider: &AnyProvider,
760        new_name: &str,
761        new_type: &str,
762        new_summary: &str,
763        existing_name: &str,
764        existing_type: &str,
765        existing_summary: &str,
766        score: f32,
767    ) -> Option<bool> {
768        let prompt = format!(
769            "New entity:\n\
770             - Name: <external-data>{new_name}</external-data>\n\
771             - Type: <external-data>{new_type}</external-data>\n\
772             - Summary: <external-data>{new_summary}</external-data>\n\
773             \n\
774             Existing entity:\n\
775             - Name: <external-data>{existing_name}</external-data>\n\
776             - Type: <external-data>{existing_type}</external-data>\n\
777             - Summary: <external-data>{existing_summary}</external-data>\n\
778             \n\
779             Cosine similarity: {score:.2}\n\
780             \n\
781             Are these the same entity? Respond with JSON: {{\"same_entity\": true}} or {{\"same_entity\": false}}"
782        );
783
784        let messages = [
785            Message::from_legacy(
786                Role::System,
787                "You are an entity disambiguation assistant. Given a new entity mention and \
788                 an existing entity from the knowledge graph, determine if they refer to the same \
789                 real-world entity. Respond only with JSON.",
790            ),
791            Message::from_legacy(Role::User, prompt),
792        ];
793
794        let response = match provider.chat(&messages).await {
795            Ok(r) => r,
796            Err(err) => {
797                tracing::warn!(error = %err, "LLM disambiguation chat failed");
798                return None;
799            }
800        };
801
802        // Parse JSON response, tolerating markdown code fences
803        let json_str = extract_json(&response);
804        match serde_json::from_str::<DisambiguationResponse>(json_str) {
805            Ok(parsed) => Some(parsed.same_entity),
806            Err(err) => {
807                tracing::warn!(error = %err, response = %response, "failed to parse LLM disambiguation response");
808                None
809            }
810        }
811    }
812
813    /// Resolve a batch of extracted entities concurrently.
814    ///
815    /// Returns a `Vec` of `(entity_id, ResolutionOutcome)` in the same order as input.
816    ///
817    /// # Errors
818    ///
819    /// Returns an error if any DB operation fails.
820    ///
821    /// # Panics
822    ///
823    /// Panics if an internal stream collection bug causes a result index to be missing.
824    /// This indicates a programming error and should never occur in correct usage.
825    pub async fn resolve_batch(
826        &self,
827        entities: &[ExtractedEntity],
828    ) -> Result<Vec<(i64, ResolutionOutcome)>, MemoryError> {
829        if entities.is_empty() {
830            return Ok(Vec::new());
831        }
832
833        // Process up to 4 embed+resolve operations concurrently (IC-S2/PERF-01).
834        let mut results: Vec<Option<(i64, ResolutionOutcome)>> = vec![None; entities.len()];
835
836        let mut stream = stream::iter(entities.iter().enumerate().map(|(i, e)| {
837            let name = e.name.clone();
838            let entity_type = e.entity_type.clone();
839            let summary = e.summary.clone();
840            async move {
841                let result = self.resolve(&name, &entity_type, summary.as_deref()).await;
842                (i, result)
843            }
844        }))
845        .buffer_unordered(4);
846
847        while let Some((i, result)) = stream.next().await {
848            match result {
849                Ok(outcome) => results[i] = Some(outcome),
850                Err(err) => return Err(err),
851            }
852        }
853
854        Ok(results
855            .into_iter()
856            .enumerate()
857            .map(|(i, r)| {
858                r.unwrap_or_else(|| {
859                    tracing::warn!(
860                        index = i,
861                        "resolve_batch: missing result at index — bug in stream collection"
862                    );
863                    panic!("resolve_batch: missing result at index {i}")
864                })
865            })
866            .collect())
867    }
868
869    /// Resolve an extracted edge: deduplicate or supersede existing edges.
870    ///
871    /// - If an active edge with the same direction and relation exists with an identical fact,
872    ///   returns `None` (deduplicated).
873    /// - If an active edge with the same direction and relation exists with a different fact,
874    ///   invalidates the old edge and inserts the new one, returning `Some(new_id)`.
875    /// - If no matching edge exists, inserts a new edge and returns `Some(new_id)`.
876    ///
877    /// Relation and fact strings are sanitized (control chars stripped, length-capped).
878    ///
879    /// # Errors
880    ///
881    /// Returns an error if any database operation fails.
882    pub async fn resolve_edge(
883        &self,
884        source_id: i64,
885        target_id: i64,
886        relation: &str,
887        fact: &str,
888        confidence: f32,
889        episode_id: Option<MessageId>,
890    ) -> Result<Option<i64>, MemoryError> {
891        let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
892        let normalized_relation =
893            truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
894
895        let fact_clean = strip_control_chars(fact.trim());
896        let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
897
898        // Fetch only exact-direction edges — no reverse edges to filter out
899        let existing_edges = self.store.edges_exact(source_id, target_id).await?;
900
901        let matching = existing_edges
902            .iter()
903            .find(|e| e.relation == normalized_relation);
904
905        if let Some(old) = matching {
906            if old.fact == normalized_fact {
907                // Exact duplicate — skip
908                return Ok(None);
909            }
910            // Same relation, different fact — supersede
911            self.store.invalidate_edge(old.id).await?;
912        }
913
914        let new_id = self
915            .store
916            .insert_edge(
917                source_id,
918                target_id,
919                &normalized_relation,
920                &normalized_fact,
921                confidence,
922                episode_id,
923            )
924            .await?;
925        Ok(Some(new_id))
926    }
927
928    /// Resolve a typed edge: deduplicate or supersede existing edges of the same type.
929    ///
930    /// Identical to [`Self::resolve_edge`] but includes `edge_type` in the matching key.
931    /// An active edge with the same `(source, target, relation, edge_type)` and identical
932    /// fact returns `None`; same relation+type with different fact is superseded.
933    ///
934    /// When `belief_revision` is `Some`, uses semantic contradiction detection to find edges
935    /// to supersede across the same relation domain. The new fact embedding is pre-computed
936    /// here (one embed call) to avoid N+1 embedding calls.
937    ///
938    /// This ensures that different MAGMA edge types for the same entity pair are stored
939    /// independently (critic mitigation: dedup key includes `edge_type`).
940    ///
941    /// # Errors
942    ///
943    /// Returns an error if any database operation fails.
944    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
945    pub async fn resolve_edge_typed(
946        &self,
947        source_id: i64,
948        target_id: i64,
949        relation: &str,
950        fact: &str,
951        confidence: f32,
952        episode_id: Option<crate::types::MessageId>,
953        edge_type: crate::graph::EdgeType,
954        belief_revision: Option<&crate::graph::BeliefRevisionConfig>,
955    ) -> Result<Option<i64>, MemoryError> {
956        let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
957        let normalized_relation =
958            truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
959
960        let fact_clean = strip_control_chars(fact.trim());
961        let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
962
963        let existing_edges = self.store.edges_exact(source_id, target_id).await?;
964
965        // Exact dedup: same (relation, edge_type, fact) → skip.
966        let matching = existing_edges
967            .iter()
968            .find(|e| e.relation == normalized_relation && e.edge_type == edge_type);
969
970        if matching.is_some_and(|old| old.fact == normalized_fact) {
971            return Ok(None);
972        }
973
974        // Determine which edges to supersede.
975        let superseded_ids: Vec<i64> = if let (Some(cfg), Some(provider)) =
976            (belief_revision, self.provider)
977        {
978            // Kumiho belief revision: embed new fact once, find semantically contradicted edges.
979            match tokio::time::timeout(
980                std::time::Duration::from_secs(5),
981                provider.embed(&normalized_fact),
982            )
983            .await
984            {
985                Ok(Ok(new_emb)) => {
986                    match crate::graph::belief_revision::find_superseded_edges(
987                        &existing_edges,
988                        &new_emb,
989                        &normalized_relation,
990                        edge_type,
991                        provider,
992                        cfg,
993                    )
994                    .await
995                    {
996                        Ok(ids) => ids,
997                        Err(err) => {
998                            tracing::warn!(error = %err,
999                                    "belief_revision: find_superseded_edges failed, falling back to exact match");
1000                            matching.map(|e| vec![e.id]).unwrap_or_default()
1001                        }
1002                    }
1003                }
1004                Ok(Err(err)) => {
1005                    tracing::warn!(error = %err,
1006                            "belief_revision: embed new fact failed, falling back to exact match");
1007                    matching.map(|e| vec![e.id]).unwrap_or_default()
1008                }
1009                Err(_) => {
1010                    tracing::warn!(
1011                        "belief_revision: embed new fact timed out, falling back to exact match"
1012                    );
1013                    matching.map(|e| vec![e.id]).unwrap_or_default()
1014                }
1015            }
1016        } else {
1017            // Legacy: exact (relation, edge_type) match with different fact.
1018            matching.map(|e| vec![e.id]).unwrap_or_default()
1019        };
1020
1021        let new_id = self
1022            .store
1023            .insert_edge_typed(
1024                source_id,
1025                target_id,
1026                &normalized_relation,
1027                &normalized_fact,
1028                confidence,
1029                episode_id,
1030                edge_type,
1031            )
1032            .await?;
1033
1034        // Supersede old edges with audit trail (belief revision) or plain invalidation (legacy).
1035        for old_id in superseded_ids {
1036            if belief_revision.is_some() {
1037                self.store
1038                    .invalidate_edge_with_supersession(old_id, new_id)
1039                    .await?;
1040            } else {
1041                self.store.invalidate_edge(old_id).await?;
1042            }
1043        }
1044
1045        Ok(Some(new_id))
1046    }
1047}
1048
1049/// Extract a JSON object from a string that may contain markdown code fences.
1050fn extract_json(s: &str) -> &str {
1051    let trimmed = s.trim();
1052    // Strip ```json ... ``` or ``` ... ```
1053    if let Some(inner) = trimmed.strip_prefix("```json")
1054        && let Some(end) = inner.rfind("```")
1055    {
1056        return inner[..end].trim();
1057    }
1058    if let Some(inner) = trimmed.strip_prefix("```")
1059        && let Some(end) = inner.rfind("```")
1060    {
1061        return inner[..end].trim();
1062    }
1063    // Find first '{' to last '}'
1064    if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
1065        && start <= end
1066    {
1067        return &trimmed[start..=end];
1068    }
1069    trimmed
1070}
1071
1072#[cfg(test)]
1073mod tests;