Skip to main content

zeph_memory/graph/resolver/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::sync::Arc;
5
6use dashmap::DashMap;
7use futures::stream::{self, StreamExt as _};
8use schemars::JsonSchema;
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use zeph_common::sanitize::strip_control_chars;
12use zeph_common::text::truncate_to_bytes_ref;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{LlmProvider as _, Message, Role};
15
16use super::store::GraphStore;
17use super::types::EntityType;
18use crate::embedding_store::EmbeddingStore;
19use crate::error::MemoryError;
20use crate::graph::extractor::ExtractedEntity;
21use crate::types::MessageId;
22use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
23
24/// Minimum byte length for entity names — rejects noise tokens like "go", "cd".
25const MIN_ENTITY_NAME_BYTES: usize = 3;
26/// Maximum byte length for entity names stored in the graph.
27const MAX_ENTITY_NAME_BYTES: usize = 512;
28/// Maximum byte length for relation strings.
29const MAX_RELATION_BYTES: usize = 256;
30/// Maximum byte length for fact strings.
31const MAX_FACT_BYTES: usize = 2048;
32
33/// Qdrant collection for entity embeddings.
34const ENTITY_COLLECTION: &str = "zeph_graph_entities";
35
36/// Timeout for a single `embed()` call in seconds.
37const EMBED_TIMEOUT_SECS: u64 = 30;
38
39/// Outcome of an entity resolution attempt.
40#[derive(Debug, Clone, PartialEq)]
41#[non_exhaustive]
42pub enum ResolutionOutcome {
43    /// Exact name+type match in `SQLite`.
44    ExactMatch,
45    /// Cosine similarity >= merge threshold; score is the cosine similarity value.
46    EmbeddingMatch { score: f32 },
47    /// LLM confirmed merge in ambiguous similarity range.
48    LlmDisambiguated,
49    /// New entity was created.
50    Created,
51}
52
53/// LLM response for entity disambiguation.
54#[derive(Debug, Deserialize, JsonSchema)]
55struct DisambiguationResponse {
56    same_entity: bool,
57}
58
59/// Per-entity-name lock guard to prevent concurrent duplicate creation.
60///
61/// Keyed by normalized entity name. Entities with different names resolve concurrently;
62/// entities with the same name are serialized.
63///
64/// TODO(SEC-M33-02): This map grows unboundedly — one entry per unique normalized name.
65/// For a short-lived resolver this is acceptable. If the resolver becomes long-lived
66/// (stored in `SemanticMemory`), add eviction or use a fixed-size sharded lock array.
67type NameLockMap = Arc<DashMap<String, Arc<Mutex<()>>>>;
68
69pub struct EntityResolver<'a> {
70    store: &'a GraphStore,
71    embedding_store: Option<&'a Arc<EmbeddingStore>>,
72    provider: Option<&'a AnyProvider>,
73    similarity_threshold: f32,
74    ambiguous_threshold: f32,
75    name_locks: NameLockMap,
76    /// Counter for error-triggered fallbacks (embed/LLM failures). Tests can read this via Arc.
77    fallback_count: Arc<std::sync::atomic::AtomicU64>,
78    /// Ensures `ensure_named_collection()` is called at most once per resolver lifetime.
79    ///
80    /// Arc-wrapped so future clones or spawned tasks can share the same gate.
81    collection_ensured: Arc<tokio::sync::OnceCell<()>>,
82    /// Per-call timeout for every `embed()` invocation. Default: 5 s.
83    embed_timeout: std::time::Duration,
84}
85
86impl<'a> EntityResolver<'a> {
87    /// Returns a reference to the underlying graph store.
88    #[must_use]
89    pub fn graph_store(&self) -> &GraphStore {
90        self.store
91    }
92
93    #[must_use]
94    pub fn new(store: &'a GraphStore) -> Self {
95        Self {
96            store,
97            embedding_store: None,
98            provider: None,
99            similarity_threshold: 0.85,
100            ambiguous_threshold: 0.70,
101            name_locks: Arc::new(DashMap::new()),
102            fallback_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
103            collection_ensured: Arc::new(tokio::sync::OnceCell::new()),
104            embed_timeout: std::time::Duration::from_secs(5),
105        }
106    }
107
108    /// Set the per-call timeout for every `embed()` invocation.
109    ///
110    /// Default: 5 s.
111    #[must_use]
112    pub fn with_embed_timeout(mut self, timeout_secs: u64) -> Self {
113        self.embed_timeout = std::time::Duration::from_secs(timeout_secs);
114        self
115    }
116
117    #[must_use]
118    pub fn with_embedding_store(mut self, store: &'a Arc<EmbeddingStore>) -> Self {
119        self.embedding_store = Some(store);
120        self
121    }
122
123    #[must_use]
124    pub fn with_provider(mut self, provider: &'a AnyProvider) -> Self {
125        self.provider = Some(provider);
126        self
127    }
128
129    #[must_use]
130    pub fn with_thresholds(mut self, similarity: f32, ambiguous: f32) -> Self {
131        self.similarity_threshold = similarity;
132        self.ambiguous_threshold = ambiguous;
133        self
134    }
135
136    /// Shared fallback counter — tests can clone this Arc to inspect the value.
137    #[must_use]
138    pub fn fallback_count(&self) -> Arc<std::sync::atomic::AtomicU64> {
139        Arc::clone(&self.fallback_count)
140    }
141
142    /// Normalize an entity name: trim, lowercase, strip control chars, truncate.
143    fn normalize_name(name: &str) -> String {
144        let lowered = name.trim().to_lowercase();
145        let cleaned = strip_control_chars(&lowered);
146        let normalized = truncate_to_bytes_ref(&cleaned, MAX_ENTITY_NAME_BYTES).to_owned();
147        if normalized.len() < cleaned.len() {
148            tracing::debug!(
149                "graph resolver: entity name truncated to {} bytes",
150                MAX_ENTITY_NAME_BYTES
151            );
152        }
153        normalized
154    }
155
156    /// Parse an entity type string, falling back to `Concept` on unknown values.
157    fn parse_entity_type(entity_type: &str) -> EntityType {
158        entity_type
159            .trim()
160            .to_lowercase()
161            .parse::<EntityType>()
162            .unwrap_or_else(|_| {
163                tracing::debug!(
164                    "graph resolver: unknown entity type {:?}, falling back to Concept",
165                    entity_type
166                );
167                EntityType::Concept
168            })
169    }
170
171    /// Acquire the per-name lock and return the guard. Keeps lock alive for the caller.
172    async fn lock_name(&self, normalized: &str) -> tokio::sync::OwnedMutexGuard<()> {
173        let lock = self
174            .name_locks
175            .entry(normalized.to_owned())
176            .or_insert_with(|| Arc::new(Mutex::new(())))
177            .clone();
178        lock.lock_owned().await
179    }
180
181    /// Resolve an extracted entity using the alias-first canonicalization pipeline.
182    ///
183    /// Pipeline:
184    /// 1. Normalize: trim, lowercase, strip control chars, truncate to 512 bytes.
185    /// 2. Parse entity type (fallback to Concept on unknown).
186    /// 3. Alias lookup: search `graph_entity_aliases` by normalized name + `entity_type`.
187    ///    If found, touch `last_seen_at` and return the existing entity id.
188    /// 4. Canonical name lookup: search `graph_entities` by `canonical_name` + `entity_type`.
189    ///    If found, touch `last_seen_at` and return the existing entity id.
190    /// 5. When `embedding_store` and `provider` are configured, performs embedding-based fuzzy
191    ///    matching: cosine similarity search (Qdrant), LLM disambiguation for ambiguous range,
192    ///    merge or create based on result. Failures degrade gracefully to step 6.
193    /// 6. Create: upsert new entity with `canonical_name` = normalized name.
194    /// 7. Register the normalized form (and original trimmed form if different) as aliases.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if the entity name is empty after normalization, or if a DB operation fails.
199    pub async fn resolve(
200        &self,
201        name: &str,
202        entity_type: &str,
203        summary: Option<&str>,
204    ) -> Result<(i64, ResolutionOutcome), MemoryError> {
205        let normalized = Self::normalize_name(name);
206
207        if normalized.is_empty() {
208            return Err(MemoryError::GraphStore("empty entity name".into()));
209        }
210
211        if normalized.len() < MIN_ENTITY_NAME_BYTES {
212            return Err(MemoryError::GraphStore(format!(
213                "entity name too short: {normalized:?} ({} bytes, min {MIN_ENTITY_NAME_BYTES})",
214                normalized.len()
215            )));
216        }
217
218        let et = Self::parse_entity_type(entity_type);
219
220        // The surface form preserves the original casing for user-facing display.
221        let surface_name = name.trim().to_owned();
222
223        // Acquire per-name lock to prevent concurrent duplicate creation.
224        let _guard = self.lock_name(&normalized).await;
225
226        // Step 3: alias-first lookup (filters by entity_type to prevent cross-type collisions).
227        if let Some(entity) = self.store.find_entity_by_alias(&normalized, et).await? {
228            self.store
229                .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
230                .await?;
231            return Ok((entity.id.0, ResolutionOutcome::ExactMatch));
232        }
233
234        // Step 4: canonical name lookup.
235        if let Some(entity) = self.store.find_entity(&normalized, et).await? {
236            self.store
237                .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
238                .await?;
239            return Ok((entity.id.0, ResolutionOutcome::ExactMatch));
240        }
241
242        // Step 5: Embedding-based resolution (when configured).
243        if let Some(outcome) = self
244            .resolve_via_embedding(&normalized, name, &surface_name, et, summary)
245            .await?
246        {
247            return Ok(outcome);
248        }
249
250        // Step 6: Create new entity (no embedding store, or embedding failure).
251        let entity_id = self
252            .store
253            .upsert_entity(&surface_name, &normalized, et, summary)
254            .await?;
255
256        self.register_aliases(entity_id.0, &normalized, name)
257            .await?;
258
259        Ok((entity_id.0, ResolutionOutcome::Created))
260    }
261
262    /// Compute embedding for an entity, incrementing `fallback_count` on failure/timeout.
263    /// Returns `None` when embedding is unavailable (caller should skip vector operations).
264    async fn embed_entity_text(
265        &self,
266        provider: &AnyProvider,
267        normalized: &str,
268        summary: Option<&str>,
269    ) -> Option<Vec<f32>> {
270        let safe_summary = truncate_to_bytes_ref(summary.unwrap_or(""), MAX_FACT_BYTES);
271        let embed_text = format!("{normalized}: {safe_summary}");
272        let embed_result = tokio::time::timeout(
273            std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
274            provider.embed(&embed_text),
275        )
276        .await;
277        match embed_result {
278            Ok(Ok(v)) => Some(v),
279            Ok(Err(err)) => {
280                self.fallback_count
281                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
282                tracing::warn!(entity_name = %normalized, error = %err,
283                    "embed() failed; falling back to exact-match-only entity creation");
284                None
285            }
286            Err(_timeout) => {
287                self.fallback_count
288                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
289                tracing::warn!(entity_name = %normalized,
290                    "embed() timed out after {}s; falling back to create new entity",
291                    EMBED_TIMEOUT_SECS);
292                None
293            }
294        }
295    }
296
297    /// Handle a candidate in the ambiguous score range by running LLM disambiguation.
298    /// Returns `Ok(Some(...))` if the LLM confirms a match, `Ok(None)` to fall through to create.
299    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
300    async fn handle_ambiguous_candidate(
301        &self,
302        emb_store: &EmbeddingStore,
303        provider: &AnyProvider,
304        payload: &std::collections::HashMap<String, serde_json::Value>,
305        point_id: &str,
306        score: f32,
307        surface_name: &str,
308        normalized: &str,
309        et: EntityType,
310        summary: Option<&str>,
311    ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
312        let entity_id = payload
313            .get("entity_id")
314            .and_then(serde_json::Value::as_i64)
315            .ok_or_else(|| MemoryError::GraphStore("missing entity_id in payload".into()))?;
316        let existing_name = payload
317            .get("name")
318            .and_then(|v| v.as_str())
319            .unwrap_or("")
320            .to_owned();
321        let existing_summary = payload
322            .get("summary")
323            .and_then(|v| v.as_str())
324            .unwrap_or("")
325            .to_owned();
326        // Use the existing entity's actual type from the payload (IC-S3)
327        let existing_type = payload
328            .get("entity_type")
329            .and_then(|v| v.as_str())
330            .unwrap_or(et.as_str())
331            .to_owned();
332        let existing_canonical = payload.get("canonical_name").and_then(|v| v.as_str());
333        let existing_summary_str = payload.get("summary").and_then(|v| v.as_str());
334        match self
335            .llm_disambiguate(
336                provider,
337                normalized,
338                et.as_str(),
339                summary.unwrap_or(""),
340                &existing_name,
341                &existing_type,
342                &existing_summary,
343                score,
344            )
345            .await
346        {
347            Some(true) => {
348                self.merge_entity(
349                    emb_store,
350                    provider,
351                    entity_id,
352                    surface_name,
353                    normalized,
354                    et,
355                    summary,
356                    existing_canonical,
357                    existing_summary_str,
358                    Some(point_id),
359                )
360                .await?;
361                Ok(Some((entity_id, ResolutionOutcome::LlmDisambiguated)))
362            }
363            Some(false) => Ok(None),
364            None => {
365                self.fallback_count
366                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
367                tracing::warn!(entity_name = %normalized,
368                    "LLM disambiguation failed; falling back to create new entity");
369                Ok(None)
370            }
371        }
372    }
373
374    /// Attempt embedding-based resolution. Returns `Ok(Some(...))` if resolved (early return),
375    /// `Ok(None)` if no match found (caller should fall through to create), or `Err` on DB error.
376    async fn resolve_via_embedding(
377        &self,
378        normalized: &str,
379        original_name: &str,
380        surface_name: &str,
381        et: EntityType,
382        summary: Option<&str>,
383    ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
384        let (Some(emb_store), Some(provider)) = (self.embedding_store, self.provider) else {
385            return Ok(None);
386        };
387
388        let Some(query_vec) = self.embed_entity_text(provider, normalized, summary).await else {
389            return Ok(None);
390        };
391
392        let type_filter = VectorFilter {
393            must: vec![FieldCondition {
394                field: "entity_type".into(),
395                value: FieldValue::Text(et.as_str().to_owned()),
396            }],
397            must_not: vec![],
398        };
399        let candidates = match emb_store
400            .search_collection(ENTITY_COLLECTION, &query_vec, 5, Some(type_filter))
401            .await
402        {
403            Ok(c) => c,
404            Err(err) => {
405                self.fallback_count
406                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
407                tracing::warn!(entity_name = %normalized, error = %err,
408                    "Qdrant search failed; falling back to create new entity");
409                return self
410                    .create_with_embedding(
411                        emb_store,
412                        surface_name,
413                        normalized,
414                        original_name,
415                        et,
416                        summary,
417                        &query_vec,
418                    )
419                    .await
420                    .map(Some);
421            }
422        };
423
424        if let Some(best) = candidates.first() {
425            let score = best.score;
426            if score >= self.similarity_threshold {
427                let entity_id = best
428                    .payload
429                    .get("entity_id")
430                    .and_then(serde_json::Value::as_i64)
431                    .ok_or_else(|| {
432                        MemoryError::GraphStore("missing entity_id in payload".into())
433                    })?;
434                let existing_canonical =
435                    best.payload.get("canonical_name").and_then(|v| v.as_str());
436                let existing_summary = best.payload.get("summary").and_then(|v| v.as_str());
437                let existing_pid = Some(best.id.as_str());
438                self.merge_entity(
439                    emb_store,
440                    provider,
441                    entity_id,
442                    surface_name,
443                    normalized,
444                    et,
445                    summary,
446                    existing_canonical,
447                    existing_summary,
448                    existing_pid,
449                )
450                .await?;
451                return Ok(Some((
452                    entity_id,
453                    ResolutionOutcome::EmbeddingMatch { score },
454                )));
455            } else if score >= self.ambiguous_threshold
456                && let Some(result) = self
457                    .handle_ambiguous_candidate(
458                        emb_store,
459                        provider,
460                        &best.payload,
461                        &best.id,
462                        score,
463                        surface_name,
464                        normalized,
465                        et,
466                        summary,
467                    )
468                    .await?
469            {
470                return Ok(Some(result));
471            }
472            // score < ambiguous_threshold or LLM said different: fall through to create with embedding
473        }
474
475        // No suitable match — create new entity and store embedding.
476        self.create_with_embedding(
477            emb_store,
478            surface_name,
479            normalized,
480            original_name,
481            et,
482            summary,
483            &query_vec,
484        )
485        .await
486        .map(Some)
487    }
488
489    /// Create a new entity, register aliases, and store its embedding in Qdrant.
490    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
491    async fn create_with_embedding(
492        &self,
493        emb_store: &EmbeddingStore,
494        surface_name: &str,
495        normalized: &str,
496        original_name: &str,
497        et: EntityType,
498        summary: Option<&str>,
499        query_vec: &[f32],
500    ) -> Result<(i64, ResolutionOutcome), MemoryError> {
501        let entity_id = self
502            .store
503            .upsert_entity(surface_name, normalized, et, summary)
504            .await?;
505        self.register_aliases(entity_id.0, normalized, original_name)
506            .await?;
507        self.store_entity_embedding(
508            emb_store,
509            entity_id.0,
510            None,
511            normalized,
512            et,
513            summary.unwrap_or(""),
514            query_vec,
515        )
516        .await;
517        Ok((entity_id.0, ResolutionOutcome::Created))
518    }
519
520    /// Register the normalized form and original trimmed form as aliases for an entity.
521    async fn register_aliases(
522        &self,
523        entity_id: i64,
524        normalized: &str,
525        original_name: &str,
526    ) -> Result<(), MemoryError> {
527        self.store.add_alias(entity_id, normalized).await?;
528
529        // Also register the original trimmed lowercased form if it differs from normalized
530        // (e.g. when control chars were stripped, leaving a shorter string).
531        let original_trimmed = original_name.trim().to_lowercase();
532        let original_clean_str = strip_control_chars(&original_trimmed);
533        let original_clean = truncate_to_bytes_ref(&original_clean_str, MAX_ENTITY_NAME_BYTES);
534        if original_clean != normalized {
535            self.store.add_alias(entity_id, original_clean).await?;
536        }
537
538        Ok(())
539    }
540
541    /// Merge an existing entity with new information: combine summaries, update Qdrant.
542    ///
543    /// `existing_canonical_name` and `existing_summary` are read from the Qdrant payload at
544    /// the call site (hot path, no `SQLite` roundtrip). Pass `None` for either when the point
545    /// predates the payload fields — a targeted `find_entity_by_id` read is then used as a
546    /// one-time fallback (legacy transition path, removed after all points are rewritten).
547    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
548    async fn merge_entity(
549        &self,
550        emb_store: &EmbeddingStore,
551        provider: &AnyProvider,
552        entity_id: i64,
553        new_surface_name: &str,
554        new_canonical_name: &str,
555        entity_type: EntityType,
556        new_summary: Option<&str>,
557        existing_canonical_name: Option<&str>,
558        existing_summary_payload: Option<&str>,
559        existing_point_id: Option<&str>,
560    ) -> Result<(), MemoryError> {
561        // Hot path: use values from the Qdrant payload (no SQLite roundtrip).
562        // Both canonical_name AND summary must be present; if either is absent the point
563        // predates the payload field — fall back to a targeted SQLite read to avoid
564        // silently dropping a summary that was written outside merge_entity.
565        let (existing_canonical, existing_summary, existing_point_id_owned) =
566            if existing_canonical_name.is_some() && existing_summary_payload.is_some() {
567                (
568                    existing_canonical_name
569                        .unwrap_or(new_canonical_name)
570                        .to_owned(),
571                    existing_summary_payload.unwrap_or("").to_owned(),
572                    existing_point_id.map(ToOwned::to_owned),
573                )
574            } else {
575                // Transition-period fallback. Legacy Qdrant points pre-date the payload
576                // fields; one targeted read is acceptable until the embedding is rewritten
577                // on the next merge. Also used when canonical_name is present but summary
578                // is absent — prevents overwriting a SQLite summary with empty string.
579                let existing = self.store.find_entity_by_id(entity_id).await?;
580                let canonical = existing_canonical_name.map_or_else(
581                    || {
582                        existing.as_ref().map_or_else(
583                            || new_canonical_name.to_owned(),
584                            |e| e.canonical_name.clone(),
585                        )
586                    },
587                    ToOwned::to_owned,
588                );
589                let summary = existing
590                    .as_ref()
591                    .and_then(|e| e.summary.as_deref())
592                    .unwrap_or("")
593                    .to_owned();
594                let pid = existing_point_id.map(ToOwned::to_owned).or_else(|| {
595                    existing
596                        .as_ref()
597                        .and_then(|e| e.qdrant_point_id.as_deref())
598                        .map(ToOwned::to_owned)
599                });
600                (canonical, summary, pid)
601            };
602
603        let merged_summary = if let Some(new) = new_summary {
604            if !new.is_empty() && !existing_summary.is_empty() {
605                let combined = format!("{existing_summary}; {new}");
606                // TODO(S2): use LLM-based summary merge when summary exceeds 512 bytes
607                truncate_to_bytes_ref(&combined, MAX_FACT_BYTES).to_owned()
608            } else if !new.is_empty() {
609                new.to_owned()
610            } else {
611                existing_summary.clone()
612            }
613        } else {
614            existing_summary.clone()
615        };
616
617        let summary_opt = if merged_summary.is_empty() {
618            None
619        } else {
620            Some(merged_summary.as_str())
621        };
622
623        // Preserve the existing display name from the payload; fall back to the incoming surface
624        // name only for brand-new entities (where the payload had no "name" field).
625        self.store
626            .upsert_entity(
627                new_surface_name,
628                &existing_canonical,
629                entity_type,
630                summary_opt,
631            )
632            .await?;
633
634        // Re-embed merged text and upsert to Qdrant
635        let embed_text = format!("{new_surface_name}: {merged_summary}");
636        let embed_result = tokio::time::timeout(
637            std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
638            provider.embed(&embed_text),
639        )
640        .await;
641
642        match embed_result {
643            Ok(Ok(vec)) => {
644                self.store_entity_embedding(
645                    emb_store,
646                    entity_id,
647                    existing_point_id_owned.as_deref(),
648                    new_surface_name,
649                    entity_type,
650                    &merged_summary,
651                    &vec,
652                )
653                .await;
654            }
655            Ok(Err(err)) => {
656                tracing::warn!(
657                    entity_id,
658                    error = %err,
659                    "merge re-embed failed; Qdrant entry may be stale"
660                );
661            }
662            Err(_) => {
663                tracing::warn!(
664                    entity_id,
665                    "merge re-embed timed out; Qdrant entry may be stale"
666                );
667            }
668        }
669
670        Ok(())
671    }
672
673    /// Store an entity embedding in Qdrant and update `qdrant_point_id` in `SQLite`.
674    ///
675    /// When `existing_point_id` is `Some`, the existing Qdrant point is updated in-place
676    /// (upsert by ID) to avoid orphaned stale points. When `None`, a new point is created.
677    ///
678    /// Failures are logged at warn level but do not propagate — the entity is still
679    /// valid in `SQLite` even if Qdrant upsert fails.
680    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
681    async fn store_entity_embedding(
682        &self,
683        emb_store: &EmbeddingStore,
684        entity_id: i64,
685        existing_point_id: Option<&str>,
686        name: &str,
687        entity_type: EntityType,
688        summary: &str,
689        vector: &[f32],
690    ) {
691        // Ensure the Qdrant collection exists exactly once per resolver lifetime.
692        // All entity embeddings use the same model dimension, so the first call's
693        // vector_size wins — subsequent entities share the same collection.
694        // On error the cell stays unset, so the next entity retries — transient
695        // network failures do not permanently disable embedding storage.
696        let vector_size = u64::try_from(vector.len()).unwrap_or(384);
697        let collection_ensured = Arc::clone(&self.collection_ensured);
698        if let Err(err) = collection_ensured
699            .get_or_try_init(|| async {
700                emb_store
701                    .ensure_named_collection(ENTITY_COLLECTION, vector_size)
702                    .await
703            })
704            .await
705        {
706            tracing::error!(
707                error = %err,
708                "failed to ensure entity embedding collection; skipping Qdrant upsert"
709            );
710            return;
711        }
712
713        let payload = serde_json::json!({
714            "entity_id": entity_id,
715            // String mirror of entity_id for scroll_all enumeration (scroll_all only surfaces
716            // StringValue payload fields; the i64 entity_id is preserved for existing search
717            // consumers which read it directly from ScoredVectorPoint.payload).
718            "entity_id_str": entity_id.to_string(),
719            "canonical_name": name,
720            "name": name,
721            "entity_type": entity_type.as_str(),
722            "summary": summary,
723        });
724
725        if let Some(point_id) = existing_point_id {
726            // Reuse existing point to avoid orphaned stale points (IC-S1)
727            if let Err(err) = emb_store
728                .upsert_to_collection(ENTITY_COLLECTION, point_id, payload, vector.to_vec())
729                .await
730            {
731                tracing::warn!(
732                    entity_id,
733                    error = %err,
734                    "Qdrant upsert (existing point) failed; Qdrant entry may be stale"
735                );
736            }
737        } else {
738            match emb_store
739                .store_to_collection(ENTITY_COLLECTION, payload, vector.to_vec())
740                .await
741            {
742                Ok(point_id) => {
743                    if let Err(err) = self
744                        .store
745                        .set_entity_qdrant_point_id(entity_id, &point_id)
746                        .await
747                    {
748                        tracing::warn!(
749                            entity_id,
750                            error = %err,
751                            "failed to store qdrant_point_id in SQLite"
752                        );
753                    }
754                }
755                Err(err) => {
756                    tracing::warn!(
757                        entity_id,
758                        error = %err,
759                        "Qdrant upsert failed; entity created in SQLite, qdrant_point_id remains NULL"
760                    );
761                }
762            }
763        }
764    }
765
766    /// Ask the LLM whether two entities are the same.
767    ///
768    /// Returns `Some(true)` for merge, `Some(false)` for separate, `None` on failure.
769    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
770    async fn llm_disambiguate(
771        &self,
772        provider: &AnyProvider,
773        new_name: &str,
774        new_type: &str,
775        new_summary: &str,
776        existing_name: &str,
777        existing_type: &str,
778        existing_summary: &str,
779        score: f32,
780    ) -> Option<bool> {
781        let prompt = format!(
782            "New entity:\n\
783             - Name: <external-data>{new_name}</external-data>\n\
784             - Type: <external-data>{new_type}</external-data>\n\
785             - Summary: <external-data>{new_summary}</external-data>\n\
786             \n\
787             Existing entity:\n\
788             - Name: <external-data>{existing_name}</external-data>\n\
789             - Type: <external-data>{existing_type}</external-data>\n\
790             - Summary: <external-data>{existing_summary}</external-data>\n\
791             \n\
792             Cosine similarity: {score:.2}\n\
793             \n\
794             Are these the same entity? Respond with JSON: {{\"same_entity\": true}} or {{\"same_entity\": false}}"
795        );
796
797        let messages = [
798            Message::from_legacy(
799                Role::System,
800                "You are an entity disambiguation assistant. Given a new entity mention and \
801                 an existing entity from the knowledge graph, determine if they refer to the same \
802                 real-world entity. Respond only with JSON.",
803            ),
804            Message::from_legacy(Role::User, prompt),
805        ];
806
807        let response = match provider.chat(&messages).await {
808            Ok(r) => r,
809            Err(err) => {
810                tracing::warn!(error = %err, "LLM disambiguation chat failed");
811                return None;
812            }
813        };
814
815        // Parse JSON response, tolerating markdown code fences
816        let json_str = extract_json(&response);
817        match serde_json::from_str::<DisambiguationResponse>(json_str) {
818            Ok(parsed) => Some(parsed.same_entity),
819            Err(err) => {
820                tracing::warn!(error = %err, response = %response, "failed to parse LLM disambiguation response");
821                None
822            }
823        }
824    }
825
826    /// Resolve a batch of extracted entities concurrently.
827    ///
828    /// Returns a `Vec` of `(entity_id, ResolutionOutcome)` in the same order as input.
829    ///
830    /// # Errors
831    ///
832    /// Returns an error if any DB operation fails.
833    ///
834    /// # Panics
835    ///
836    /// Panics if an internal stream collection bug causes a result index to be missing.
837    /// This indicates a programming error and should never occur in correct usage.
838    pub async fn resolve_batch(
839        &self,
840        entities: &[ExtractedEntity],
841    ) -> Result<Vec<(i64, ResolutionOutcome)>, MemoryError> {
842        if entities.is_empty() {
843            return Ok(Vec::new());
844        }
845
846        // Process up to 4 embed+resolve operations concurrently (IC-S2/PERF-01).
847        let mut results: Vec<Option<(i64, ResolutionOutcome)>> = vec![None; entities.len()];
848
849        let mut stream = stream::iter(entities.iter().enumerate().map(|(i, e)| {
850            let name = e.name.clone();
851            let entity_type = e.entity_type.clone();
852            let summary = e.summary.clone();
853            async move {
854                let result = self.resolve(&name, &entity_type, summary.as_deref()).await;
855                (i, result)
856            }
857        }))
858        .buffer_unordered(4);
859
860        while let Some((i, result)) = stream.next().await {
861            match result {
862                Ok(outcome) => results[i] = Some(outcome),
863                Err(err) => return Err(err),
864            }
865        }
866
867        Ok(results
868            .into_iter()
869            .enumerate()
870            .map(|(i, r)| {
871                r.unwrap_or_else(|| {
872                    tracing::warn!(
873                        index = i,
874                        "resolve_batch: missing result at index — bug in stream collection"
875                    );
876                    panic!("resolve_batch: missing result at index {i}")
877                })
878            })
879            .collect())
880    }
881
882    /// Resolve an extracted edge: deduplicate or supersede existing edges.
883    ///
884    /// - If an active edge with the same direction and relation exists with an identical fact,
885    ///   returns `None` (deduplicated).
886    /// - If an active edge with the same direction and relation exists with a different fact,
887    ///   invalidates the old edge and inserts the new one, returning `Some(new_id)`.
888    /// - If no matching edge exists, inserts a new edge and returns `Some(new_id)`.
889    ///
890    /// Relation and fact strings are sanitized (control chars stripped, length-capped).
891    ///
892    /// # Errors
893    ///
894    /// Returns an error if any database operation fails.
895    pub async fn resolve_edge(
896        &self,
897        source_id: i64,
898        target_id: i64,
899        relation: &str,
900        fact: &str,
901        confidence: f32,
902        episode_id: Option<MessageId>,
903    ) -> Result<Option<i64>, MemoryError> {
904        let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
905        let normalized_relation =
906            truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
907
908        let fact_clean = strip_control_chars(fact.trim());
909        let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
910
911        // Fetch only exact-direction edges — no reverse edges to filter out
912        let existing_edges = self.store.edges_exact(source_id, target_id).await?;
913
914        let matching = existing_edges
915            .iter()
916            .find(|e| e.relation == normalized_relation);
917
918        if let Some(old) = matching {
919            if old.fact == normalized_fact {
920                // Exact duplicate — skip
921                return Ok(None);
922            }
923            // Same relation, different fact — supersede
924            self.store.invalidate_edge(old.id).await?;
925        }
926
927        let new_id = self
928            .store
929            .insert_edge(
930                source_id,
931                target_id,
932                &normalized_relation,
933                &normalized_fact,
934                confidence,
935                episode_id,
936            )
937            .await?;
938        Ok(Some(new_id))
939    }
940
941    /// Resolve a typed edge: deduplicate or supersede existing edges of the same type.
942    ///
943    /// Identical to [`Self::resolve_edge`] but includes `edge_type` in the matching key.
944    /// An active edge with the same `(source, target, relation, edge_type)` and identical
945    /// fact returns `None`; same relation+type with different fact is superseded.
946    ///
947    /// When `belief_revision` is `Some`, uses semantic contradiction detection to find edges
948    /// to supersede across the same relation domain. The new fact embedding is pre-computed
949    /// here (one embed call) to avoid N+1 embedding calls.
950    ///
951    /// This ensures that different MAGMA edge types for the same entity pair are stored
952    /// independently (critic mitigation: dedup key includes `edge_type`).
953    ///
954    /// # Errors
955    ///
956    /// Returns an error if any database operation fails.
957    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
958    pub async fn resolve_edge_typed(
959        &self,
960        source_id: i64,
961        target_id: i64,
962        relation: &str,
963        fact: &str,
964        confidence: f32,
965        episode_id: Option<crate::types::MessageId>,
966        edge_type: crate::graph::EdgeType,
967        belief_revision: Option<&crate::graph::BeliefRevisionConfig>,
968    ) -> Result<Option<i64>, MemoryError> {
969        let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
970        let normalized_relation =
971            truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
972
973        let fact_clean = strip_control_chars(fact.trim());
974        let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
975
976        let existing_edges = self.store.edges_exact(source_id, target_id).await?;
977
978        // Exact dedup: same (relation, edge_type, fact) → skip.
979        let matching = existing_edges
980            .iter()
981            .find(|e| e.relation == normalized_relation && e.edge_type == edge_type);
982
983        if matching.is_some_and(|old| old.fact == normalized_fact) {
984            return Ok(None);
985        }
986
987        // Determine which edges to supersede.
988        let superseded_ids: Vec<i64> = if let (Some(cfg), Some(provider)) =
989            (belief_revision, self.provider)
990        {
991            // Kumiho belief revision: embed new fact once, find semantically contradicted edges.
992            match tokio::time::timeout(self.embed_timeout, provider.embed(&normalized_fact)).await {
993                Ok(Ok(new_emb)) => {
994                    match crate::graph::belief_revision::find_superseded_edges(
995                        &existing_edges,
996                        &new_emb,
997                        &normalized_relation,
998                        edge_type,
999                        provider,
1000                        cfg,
1001                        self.embed_timeout,
1002                    )
1003                    .await
1004                    {
1005                        Ok(ids) => ids,
1006                        Err(err) => {
1007                            tracing::warn!(error = %err,
1008                                    "belief_revision: find_superseded_edges failed, falling back to exact match");
1009                            matching.map(|e| vec![e.id]).unwrap_or_default()
1010                        }
1011                    }
1012                }
1013                Ok(Err(err)) => {
1014                    tracing::warn!(error = %err,
1015                            "belief_revision: embed new fact failed, falling back to exact match");
1016                    matching.map(|e| vec![e.id]).unwrap_or_default()
1017                }
1018                Err(_) => {
1019                    tracing::warn!(
1020                        "belief_revision: embed new fact timed out, falling back to exact match"
1021                    );
1022                    matching.map(|e| vec![e.id]).unwrap_or_default()
1023                }
1024            }
1025        } else {
1026            // Legacy: exact (relation, edge_type) match with different fact.
1027            matching.map(|e| vec![e.id]).unwrap_or_default()
1028        };
1029
1030        let new_id = self
1031            .store
1032            .insert_edge_typed(
1033                source_id,
1034                target_id,
1035                &normalized_relation,
1036                &normalized_fact,
1037                confidence,
1038                episode_id,
1039                edge_type,
1040            )
1041            .await?;
1042
1043        // Supersede old edges with audit trail (belief revision) or plain invalidation (legacy).
1044        for old_id in superseded_ids {
1045            if belief_revision.is_some() {
1046                self.store
1047                    .invalidate_edge_with_supersession(old_id, new_id)
1048                    .await?;
1049            } else {
1050                self.store.invalidate_edge(old_id).await?;
1051            }
1052        }
1053
1054        Ok(Some(new_id))
1055    }
1056}
1057
1058/// Extract a JSON object from a string that may contain markdown code fences.
1059fn extract_json(s: &str) -> &str {
1060    let trimmed = s.trim();
1061    // Strip ```json ... ``` or ``` ... ```
1062    if let Some(inner) = trimmed.strip_prefix("```json")
1063        && let Some(end) = inner.rfind("```")
1064    {
1065        return inner[..end].trim();
1066    }
1067    if let Some(inner) = trimmed.strip_prefix("```")
1068        && let Some(end) = inner.rfind("```")
1069    {
1070        return inner[..end].trim();
1071    }
1072    // Find first '{' to last '}'
1073    if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
1074        && start <= end
1075    {
1076        return &trimmed[start..=end];
1077    }
1078    trimmed
1079}
1080
1081#[cfg(test)]
1082mod tests;