Skip to main content

zeph_memory/graph/resolver/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::sync::Arc;
5
6use dashmap::DashMap;
7use futures::stream::{self, StreamExt as _};
8use schemars::JsonSchema;
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use zeph_common::sanitize::strip_control_chars;
12use zeph_common::text::truncate_to_bytes_ref;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{LlmProvider as _, Message, Role};
15
16use super::store::GraphStore;
17use super::types::EntityType;
18use crate::embedding_store::EmbeddingStore;
19use crate::error::MemoryError;
20use crate::graph::extractor::ExtractedEntity;
21use crate::types::MessageId;
22use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
23
24/// Minimum byte length for entity names — rejects noise tokens like "go", "cd".
25const MIN_ENTITY_NAME_BYTES: usize = 3;
26/// Maximum byte length for entity names stored in the graph.
27const MAX_ENTITY_NAME_BYTES: usize = 512;
28/// Maximum byte length for relation strings.
29const MAX_RELATION_BYTES: usize = 256;
30/// Maximum byte length for fact strings.
31const MAX_FACT_BYTES: usize = 2048;
32
33/// Qdrant collection for entity embeddings.
34const ENTITY_COLLECTION: &str = "zeph_graph_entities";
35
36/// Timeout for a single `embed()` call in seconds.
37const EMBED_TIMEOUT_SECS: u64 = 30;
38
39/// Outcome of an entity resolution attempt.
40#[derive(Debug, Clone, PartialEq)]
41pub enum ResolutionOutcome {
42    /// Exact name+type match in `SQLite`.
43    ExactMatch,
44    /// Cosine similarity >= merge threshold; score is the cosine similarity value.
45    EmbeddingMatch { score: f32 },
46    /// LLM confirmed merge in ambiguous similarity range.
47    LlmDisambiguated,
48    /// New entity was created.
49    Created,
50}
51
52/// LLM response for entity disambiguation.
53#[derive(Debug, Deserialize, JsonSchema)]
54struct DisambiguationResponse {
55    same_entity: bool,
56}
57
58/// Per-entity-name lock guard to prevent concurrent duplicate creation.
59///
60/// Keyed by normalized entity name. Entities with different names resolve concurrently;
61/// entities with the same name are serialized.
62///
63/// TODO(SEC-M33-02): This map grows unboundedly — one entry per unique normalized name.
64/// For a short-lived resolver this is acceptable. If the resolver becomes long-lived
65/// (stored in `SemanticMemory`), add eviction or use a fixed-size sharded lock array.
66type NameLockMap = Arc<DashMap<String, Arc<Mutex<()>>>>;
67
68pub struct EntityResolver<'a> {
69    store: &'a GraphStore,
70    embedding_store: Option<&'a Arc<EmbeddingStore>>,
71    provider: Option<&'a AnyProvider>,
72    similarity_threshold: f32,
73    ambiguous_threshold: f32,
74    name_locks: NameLockMap,
75    /// Counter for error-triggered fallbacks (embed/LLM failures). Tests can read this via Arc.
76    fallback_count: Arc<std::sync::atomic::AtomicU64>,
77    /// Ensures `ensure_named_collection()` is called at most once per resolver lifetime.
78    ///
79    /// Arc-wrapped so future clones or spawned tasks can share the same gate.
80    collection_ensured: Arc<tokio::sync::OnceCell<()>>,
81}
82
83impl<'a> EntityResolver<'a> {
84    #[must_use]
85    pub fn new(store: &'a GraphStore) -> Self {
86        Self {
87            store,
88            embedding_store: None,
89            provider: None,
90            similarity_threshold: 0.85,
91            ambiguous_threshold: 0.70,
92            name_locks: Arc::new(DashMap::new()),
93            fallback_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
94            collection_ensured: Arc::new(tokio::sync::OnceCell::new()),
95        }
96    }
97
98    #[must_use]
99    pub fn with_embedding_store(mut self, store: &'a Arc<EmbeddingStore>) -> Self {
100        self.embedding_store = Some(store);
101        self
102    }
103
104    #[must_use]
105    pub fn with_provider(mut self, provider: &'a AnyProvider) -> Self {
106        self.provider = Some(provider);
107        self
108    }
109
110    #[must_use]
111    pub fn with_thresholds(mut self, similarity: f32, ambiguous: f32) -> Self {
112        self.similarity_threshold = similarity;
113        self.ambiguous_threshold = ambiguous;
114        self
115    }
116
117    /// Shared fallback counter — tests can clone this Arc to inspect the value.
118    #[must_use]
119    pub fn fallback_count(&self) -> Arc<std::sync::atomic::AtomicU64> {
120        Arc::clone(&self.fallback_count)
121    }
122
123    /// Normalize an entity name: trim, lowercase, strip control chars, truncate.
124    fn normalize_name(name: &str) -> String {
125        let lowered = name.trim().to_lowercase();
126        let cleaned = strip_control_chars(&lowered);
127        let normalized = truncate_to_bytes_ref(&cleaned, MAX_ENTITY_NAME_BYTES).to_owned();
128        if normalized.len() < cleaned.len() {
129            tracing::debug!(
130                "graph resolver: entity name truncated to {} bytes",
131                MAX_ENTITY_NAME_BYTES
132            );
133        }
134        normalized
135    }
136
137    /// Parse an entity type string, falling back to `Concept` on unknown values.
138    fn parse_entity_type(entity_type: &str) -> EntityType {
139        entity_type
140            .trim()
141            .to_lowercase()
142            .parse::<EntityType>()
143            .unwrap_or_else(|_| {
144                tracing::debug!(
145                    "graph resolver: unknown entity type {:?}, falling back to Concept",
146                    entity_type
147                );
148                EntityType::Concept
149            })
150    }
151
152    /// Acquire the per-name lock and return the guard. Keeps lock alive for the caller.
153    async fn lock_name(&self, normalized: &str) -> tokio::sync::OwnedMutexGuard<()> {
154        let lock = self
155            .name_locks
156            .entry(normalized.to_owned())
157            .or_insert_with(|| Arc::new(Mutex::new(())))
158            .clone();
159        lock.lock_owned().await
160    }
161
162    /// Resolve an extracted entity using the alias-first canonicalization pipeline.
163    ///
164    /// Pipeline:
165    /// 1. Normalize: trim, lowercase, strip control chars, truncate to 512 bytes.
166    /// 2. Parse entity type (fallback to Concept on unknown).
167    /// 3. Alias lookup: search `graph_entity_aliases` by normalized name + `entity_type`.
168    ///    If found, touch `last_seen_at` and return the existing entity id.
169    /// 4. Canonical name lookup: search `graph_entities` by `canonical_name` + `entity_type`.
170    ///    If found, touch `last_seen_at` and return the existing entity id.
171    /// 5. When `embedding_store` and `provider` are configured, performs embedding-based fuzzy
172    ///    matching: cosine similarity search (Qdrant), LLM disambiguation for ambiguous range,
173    ///    merge or create based on result. Failures degrade gracefully to step 6.
174    /// 6. Create: upsert new entity with `canonical_name` = normalized name.
175    /// 7. Register the normalized form (and original trimmed form if different) as aliases.
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if the entity name is empty after normalization, or if a DB operation fails.
180    pub async fn resolve(
181        &self,
182        name: &str,
183        entity_type: &str,
184        summary: Option<&str>,
185    ) -> Result<(i64, ResolutionOutcome), MemoryError> {
186        let normalized = Self::normalize_name(name);
187
188        if normalized.is_empty() {
189            return Err(MemoryError::GraphStore("empty entity name".into()));
190        }
191
192        if normalized.len() < MIN_ENTITY_NAME_BYTES {
193            return Err(MemoryError::GraphStore(format!(
194                "entity name too short: {normalized:?} ({} bytes, min {MIN_ENTITY_NAME_BYTES})",
195                normalized.len()
196            )));
197        }
198
199        let et = Self::parse_entity_type(entity_type);
200
201        // The surface form preserves the original casing for user-facing display.
202        let surface_name = name.trim().to_owned();
203
204        // Acquire per-name lock to prevent concurrent duplicate creation.
205        let _guard = self.lock_name(&normalized).await;
206
207        // Step 3: alias-first lookup (filters by entity_type to prevent cross-type collisions).
208        if let Some(entity) = self.store.find_entity_by_alias(&normalized, et).await? {
209            self.store
210                .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
211                .await?;
212            return Ok((entity.id, ResolutionOutcome::ExactMatch));
213        }
214
215        // Step 4: canonical name lookup.
216        if let Some(entity) = self.store.find_entity(&normalized, et).await? {
217            self.store
218                .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
219                .await?;
220            return Ok((entity.id, ResolutionOutcome::ExactMatch));
221        }
222
223        // Step 5: Embedding-based resolution (when configured).
224        if let Some(outcome) = self
225            .resolve_via_embedding(&normalized, name, &surface_name, et, summary)
226            .await?
227        {
228            return Ok(outcome);
229        }
230
231        // Step 6: Create new entity (no embedding store, or embedding failure).
232        let entity_id = self
233            .store
234            .upsert_entity(&surface_name, &normalized, et, summary)
235            .await?;
236
237        self.register_aliases(entity_id, &normalized, name).await?;
238
239        Ok((entity_id, ResolutionOutcome::Created))
240    }
241
242    /// Compute embedding for an entity, incrementing `fallback_count` on failure/timeout.
243    /// Returns `None` when embedding is unavailable (caller should skip vector operations).
244    async fn embed_entity_text(
245        &self,
246        provider: &AnyProvider,
247        normalized: &str,
248        summary: Option<&str>,
249    ) -> Option<Vec<f32>> {
250        let safe_summary = truncate_to_bytes_ref(summary.unwrap_or(""), MAX_FACT_BYTES);
251        let embed_text = format!("{normalized}: {safe_summary}");
252        let embed_result = tokio::time::timeout(
253            std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
254            provider.embed(&embed_text),
255        )
256        .await;
257        match embed_result {
258            Ok(Ok(v)) => Some(v),
259            Ok(Err(err)) => {
260                self.fallback_count
261                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
262                tracing::warn!(entity_name = %normalized, error = %err,
263                    "embed() failed; falling back to exact-match-only entity creation");
264                None
265            }
266            Err(_timeout) => {
267                self.fallback_count
268                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
269                tracing::warn!(entity_name = %normalized,
270                    "embed() timed out after {}s; falling back to create new entity",
271                    EMBED_TIMEOUT_SECS);
272                None
273            }
274        }
275    }
276
277    /// Handle a candidate in the ambiguous score range by running LLM disambiguation.
278    /// Returns `Ok(Some(...))` if the LLM confirms a match, `Ok(None)` to fall through to create.
279    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
280    async fn handle_ambiguous_candidate(
281        &self,
282        emb_store: &EmbeddingStore,
283        provider: &AnyProvider,
284        payload: &std::collections::HashMap<String, serde_json::Value>,
285        point_id: &str,
286        score: f32,
287        surface_name: &str,
288        normalized: &str,
289        et: EntityType,
290        summary: Option<&str>,
291    ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
292        let entity_id = payload
293            .get("entity_id")
294            .and_then(serde_json::Value::as_i64)
295            .ok_or_else(|| MemoryError::GraphStore("missing entity_id in payload".into()))?;
296        let existing_name = payload
297            .get("name")
298            .and_then(|v| v.as_str())
299            .unwrap_or("")
300            .to_owned();
301        let existing_summary = payload
302            .get("summary")
303            .and_then(|v| v.as_str())
304            .unwrap_or("")
305            .to_owned();
306        // Use the existing entity's actual type from the payload (IC-S3)
307        let existing_type = payload
308            .get("entity_type")
309            .and_then(|v| v.as_str())
310            .unwrap_or(et.as_str())
311            .to_owned();
312        let existing_canonical = payload.get("canonical_name").and_then(|v| v.as_str());
313        let existing_summary_str = payload.get("summary").and_then(|v| v.as_str());
314        match self
315            .llm_disambiguate(
316                provider,
317                normalized,
318                et.as_str(),
319                summary.unwrap_or(""),
320                &existing_name,
321                &existing_type,
322                &existing_summary,
323                score,
324            )
325            .await
326        {
327            Some(true) => {
328                self.merge_entity(
329                    emb_store,
330                    provider,
331                    entity_id,
332                    surface_name,
333                    normalized,
334                    et,
335                    summary,
336                    existing_canonical,
337                    existing_summary_str,
338                    Some(point_id),
339                )
340                .await?;
341                Ok(Some((entity_id, ResolutionOutcome::LlmDisambiguated)))
342            }
343            Some(false) => Ok(None),
344            None => {
345                self.fallback_count
346                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
347                tracing::warn!(entity_name = %normalized,
348                    "LLM disambiguation failed; falling back to create new entity");
349                Ok(None)
350            }
351        }
352    }
353
354    /// Attempt embedding-based resolution. Returns `Ok(Some(...))` if resolved (early return),
355    /// `Ok(None)` if no match found (caller should fall through to create), or `Err` on DB error.
356    async fn resolve_via_embedding(
357        &self,
358        normalized: &str,
359        original_name: &str,
360        surface_name: &str,
361        et: EntityType,
362        summary: Option<&str>,
363    ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
364        let (Some(emb_store), Some(provider)) = (self.embedding_store, self.provider) else {
365            return Ok(None);
366        };
367
368        let Some(query_vec) = self.embed_entity_text(provider, normalized, summary).await else {
369            return Ok(None);
370        };
371
372        let type_filter = VectorFilter {
373            must: vec![FieldCondition {
374                field: "entity_type".into(),
375                value: FieldValue::Text(et.as_str().to_owned()),
376            }],
377            must_not: vec![],
378        };
379        let candidates = match emb_store
380            .search_collection(ENTITY_COLLECTION, &query_vec, 5, Some(type_filter))
381            .await
382        {
383            Ok(c) => c,
384            Err(err) => {
385                self.fallback_count
386                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
387                tracing::warn!(entity_name = %normalized, error = %err,
388                    "Qdrant search failed; falling back to create new entity");
389                return self
390                    .create_with_embedding(
391                        emb_store,
392                        surface_name,
393                        normalized,
394                        original_name,
395                        et,
396                        summary,
397                        &query_vec,
398                    )
399                    .await
400                    .map(Some);
401            }
402        };
403
404        if let Some(best) = candidates.first() {
405            let score = best.score;
406            if score >= self.similarity_threshold {
407                let entity_id = best
408                    .payload
409                    .get("entity_id")
410                    .and_then(serde_json::Value::as_i64)
411                    .ok_or_else(|| {
412                        MemoryError::GraphStore("missing entity_id in payload".into())
413                    })?;
414                let existing_canonical =
415                    best.payload.get("canonical_name").and_then(|v| v.as_str());
416                let existing_summary = best.payload.get("summary").and_then(|v| v.as_str());
417                let existing_pid = Some(best.id.as_str());
418                self.merge_entity(
419                    emb_store,
420                    provider,
421                    entity_id,
422                    surface_name,
423                    normalized,
424                    et,
425                    summary,
426                    existing_canonical,
427                    existing_summary,
428                    existing_pid,
429                )
430                .await?;
431                return Ok(Some((
432                    entity_id,
433                    ResolutionOutcome::EmbeddingMatch { score },
434                )));
435            } else if score >= self.ambiguous_threshold
436                && let Some(result) = self
437                    .handle_ambiguous_candidate(
438                        emb_store,
439                        provider,
440                        &best.payload,
441                        &best.id,
442                        score,
443                        surface_name,
444                        normalized,
445                        et,
446                        summary,
447                    )
448                    .await?
449            {
450                return Ok(Some(result));
451            }
452            // score < ambiguous_threshold or LLM said different: fall through to create with embedding
453        }
454
455        // No suitable match — create new entity and store embedding.
456        self.create_with_embedding(
457            emb_store,
458            surface_name,
459            normalized,
460            original_name,
461            et,
462            summary,
463            &query_vec,
464        )
465        .await
466        .map(Some)
467    }
468
469    /// Create a new entity, register aliases, and store its embedding in Qdrant.
470    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
471    async fn create_with_embedding(
472        &self,
473        emb_store: &EmbeddingStore,
474        surface_name: &str,
475        normalized: &str,
476        original_name: &str,
477        et: EntityType,
478        summary: Option<&str>,
479        query_vec: &[f32],
480    ) -> Result<(i64, ResolutionOutcome), MemoryError> {
481        let entity_id = self
482            .store
483            .upsert_entity(surface_name, normalized, et, summary)
484            .await?;
485        self.register_aliases(entity_id, normalized, original_name)
486            .await?;
487        self.store_entity_embedding(
488            emb_store,
489            entity_id,
490            None,
491            normalized,
492            et,
493            summary.unwrap_or(""),
494            query_vec,
495        )
496        .await;
497        Ok((entity_id, ResolutionOutcome::Created))
498    }
499
500    /// Register the normalized form and original trimmed form as aliases for an entity.
501    async fn register_aliases(
502        &self,
503        entity_id: i64,
504        normalized: &str,
505        original_name: &str,
506    ) -> Result<(), MemoryError> {
507        self.store.add_alias(entity_id, normalized).await?;
508
509        // Also register the original trimmed lowercased form if it differs from normalized
510        // (e.g. when control chars were stripped, leaving a shorter string).
511        let original_trimmed = original_name.trim().to_lowercase();
512        let original_clean_str = strip_control_chars(&original_trimmed);
513        let original_clean = truncate_to_bytes_ref(&original_clean_str, MAX_ENTITY_NAME_BYTES);
514        if original_clean != normalized {
515            self.store.add_alias(entity_id, original_clean).await?;
516        }
517
518        Ok(())
519    }
520
521    /// Merge an existing entity with new information: combine summaries, update Qdrant.
522    ///
523    /// `existing_canonical_name` and `existing_summary` are read from the Qdrant payload at
524    /// the call site (hot path, no `SQLite` roundtrip). Pass `None` for either when the point
525    /// predates the payload fields — a targeted `find_entity_by_id` read is then used as a
526    /// one-time fallback (legacy transition path, removed after all points are rewritten).
527    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
528    async fn merge_entity(
529        &self,
530        emb_store: &EmbeddingStore,
531        provider: &AnyProvider,
532        entity_id: i64,
533        new_surface_name: &str,
534        new_canonical_name: &str,
535        entity_type: EntityType,
536        new_summary: Option<&str>,
537        existing_canonical_name: Option<&str>,
538        existing_summary_payload: Option<&str>,
539        existing_point_id: Option<&str>,
540    ) -> Result<(), MemoryError> {
541        // Hot path: use values from the Qdrant payload (no SQLite roundtrip).
542        // Both canonical_name AND summary must be present; if either is absent the point
543        // predates the payload field — fall back to a targeted SQLite read to avoid
544        // silently dropping a summary that was written outside merge_entity.
545        let (existing_canonical, existing_summary, existing_point_id_owned) =
546            if existing_canonical_name.is_some() && existing_summary_payload.is_some() {
547                (
548                    existing_canonical_name
549                        .unwrap_or(new_canonical_name)
550                        .to_owned(),
551                    existing_summary_payload.unwrap_or("").to_owned(),
552                    existing_point_id.map(ToOwned::to_owned),
553                )
554            } else {
555                // Transition-period fallback. Legacy Qdrant points pre-date the payload
556                // fields; one targeted read is acceptable until the embedding is rewritten
557                // on the next merge. Also used when canonical_name is present but summary
558                // is absent — prevents overwriting a SQLite summary with empty string.
559                let existing = self.store.find_entity_by_id(entity_id).await?;
560                let canonical = existing_canonical_name.map_or_else(
561                    || {
562                        existing.as_ref().map_or_else(
563                            || new_canonical_name.to_owned(),
564                            |e| e.canonical_name.clone(),
565                        )
566                    },
567                    ToOwned::to_owned,
568                );
569                let summary = existing
570                    .as_ref()
571                    .and_then(|e| e.summary.as_deref())
572                    .unwrap_or("")
573                    .to_owned();
574                let pid = existing_point_id.map(ToOwned::to_owned).or_else(|| {
575                    existing
576                        .as_ref()
577                        .and_then(|e| e.qdrant_point_id.as_deref())
578                        .map(ToOwned::to_owned)
579                });
580                (canonical, summary, pid)
581            };
582
583        let merged_summary = if let Some(new) = new_summary {
584            if !new.is_empty() && !existing_summary.is_empty() {
585                let combined = format!("{existing_summary}; {new}");
586                // TODO(S2): use LLM-based summary merge when summary exceeds 512 bytes
587                truncate_to_bytes_ref(&combined, MAX_FACT_BYTES).to_owned()
588            } else if !new.is_empty() {
589                new.to_owned()
590            } else {
591                existing_summary.clone()
592            }
593        } else {
594            existing_summary.clone()
595        };
596
597        let summary_opt = if merged_summary.is_empty() {
598            None
599        } else {
600            Some(merged_summary.as_str())
601        };
602
603        // Preserve the existing display name from the payload; fall back to the incoming surface
604        // name only for brand-new entities (where the payload had no "name" field).
605        self.store
606            .upsert_entity(
607                new_surface_name,
608                &existing_canonical,
609                entity_type,
610                summary_opt,
611            )
612            .await?;
613
614        // Re-embed merged text and upsert to Qdrant
615        let embed_text = format!("{new_surface_name}: {merged_summary}");
616        let embed_result = tokio::time::timeout(
617            std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
618            provider.embed(&embed_text),
619        )
620        .await;
621
622        match embed_result {
623            Ok(Ok(vec)) => {
624                self.store_entity_embedding(
625                    emb_store,
626                    entity_id,
627                    existing_point_id_owned.as_deref(),
628                    new_surface_name,
629                    entity_type,
630                    &merged_summary,
631                    &vec,
632                )
633                .await;
634            }
635            Ok(Err(err)) => {
636                tracing::warn!(
637                    entity_id,
638                    error = %err,
639                    "merge re-embed failed; Qdrant entry may be stale"
640                );
641            }
642            Err(_) => {
643                tracing::warn!(
644                    entity_id,
645                    "merge re-embed timed out; Qdrant entry may be stale"
646                );
647            }
648        }
649
650        Ok(())
651    }
652
653    /// Store an entity embedding in Qdrant and update `qdrant_point_id` in `SQLite`.
654    ///
655    /// When `existing_point_id` is `Some`, the existing Qdrant point is updated in-place
656    /// (upsert by ID) to avoid orphaned stale points. When `None`, a new point is created.
657    ///
658    /// Failures are logged at warn level but do not propagate — the entity is still
659    /// valid in `SQLite` even if Qdrant upsert fails.
660    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
661    async fn store_entity_embedding(
662        &self,
663        emb_store: &EmbeddingStore,
664        entity_id: i64,
665        existing_point_id: Option<&str>,
666        name: &str,
667        entity_type: EntityType,
668        summary: &str,
669        vector: &[f32],
670    ) {
671        // Ensure the Qdrant collection exists exactly once per resolver lifetime.
672        // All entity embeddings use the same model dimension, so the first call's
673        // vector_size wins — subsequent entities share the same collection.
674        // On error the cell stays unset, so the next entity retries — transient
675        // network failures do not permanently disable embedding storage.
676        let vector_size = u64::try_from(vector.len()).unwrap_or(384);
677        let collection_ensured = Arc::clone(&self.collection_ensured);
678        if let Err(err) = collection_ensured
679            .get_or_try_init(|| async {
680                emb_store
681                    .ensure_named_collection(ENTITY_COLLECTION, vector_size)
682                    .await
683            })
684            .await
685        {
686            tracing::error!(
687                error = %err,
688                "failed to ensure entity embedding collection; skipping Qdrant upsert"
689            );
690            return;
691        }
692
693        let payload = serde_json::json!({
694            "entity_id": entity_id,
695            // String mirror of entity_id for scroll_all enumeration (scroll_all only surfaces
696            // StringValue payload fields; the i64 entity_id is preserved for existing search
697            // consumers which read it directly from ScoredVectorPoint.payload).
698            "entity_id_str": entity_id.to_string(),
699            "canonical_name": name,
700            "name": name,
701            "entity_type": entity_type.as_str(),
702            "summary": summary,
703        });
704
705        if let Some(point_id) = existing_point_id {
706            // Reuse existing point to avoid orphaned stale points (IC-S1)
707            if let Err(err) = emb_store
708                .upsert_to_collection(ENTITY_COLLECTION, point_id, payload, vector.to_vec())
709                .await
710            {
711                tracing::warn!(
712                    entity_id,
713                    error = %err,
714                    "Qdrant upsert (existing point) failed; Qdrant entry may be stale"
715                );
716            }
717        } else {
718            match emb_store
719                .store_to_collection(ENTITY_COLLECTION, payload, vector.to_vec())
720                .await
721            {
722                Ok(point_id) => {
723                    if let Err(err) = self
724                        .store
725                        .set_entity_qdrant_point_id(entity_id, &point_id)
726                        .await
727                    {
728                        tracing::warn!(
729                            entity_id,
730                            error = %err,
731                            "failed to store qdrant_point_id in SQLite"
732                        );
733                    }
734                }
735                Err(err) => {
736                    tracing::warn!(
737                        entity_id,
738                        error = %err,
739                        "Qdrant upsert failed; entity created in SQLite, qdrant_point_id remains NULL"
740                    );
741                }
742            }
743        }
744    }
745
746    /// Ask the LLM whether two entities are the same.
747    ///
748    /// Returns `Some(true)` for merge, `Some(false)` for separate, `None` on failure.
749    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
750    async fn llm_disambiguate(
751        &self,
752        provider: &AnyProvider,
753        new_name: &str,
754        new_type: &str,
755        new_summary: &str,
756        existing_name: &str,
757        existing_type: &str,
758        existing_summary: &str,
759        score: f32,
760    ) -> Option<bool> {
761        let prompt = format!(
762            "New entity:\n\
763             - Name: <external-data>{new_name}</external-data>\n\
764             - Type: <external-data>{new_type}</external-data>\n\
765             - Summary: <external-data>{new_summary}</external-data>\n\
766             \n\
767             Existing entity:\n\
768             - Name: <external-data>{existing_name}</external-data>\n\
769             - Type: <external-data>{existing_type}</external-data>\n\
770             - Summary: <external-data>{existing_summary}</external-data>\n\
771             \n\
772             Cosine similarity: {score:.2}\n\
773             \n\
774             Are these the same entity? Respond with JSON: {{\"same_entity\": true}} or {{\"same_entity\": false}}"
775        );
776
777        let messages = [
778            Message::from_legacy(
779                Role::System,
780                "You are an entity disambiguation assistant. Given a new entity mention and \
781                 an existing entity from the knowledge graph, determine if they refer to the same \
782                 real-world entity. Respond only with JSON.",
783            ),
784            Message::from_legacy(Role::User, prompt),
785        ];
786
787        let response = match provider.chat(&messages).await {
788            Ok(r) => r,
789            Err(err) => {
790                tracing::warn!(error = %err, "LLM disambiguation chat failed");
791                return None;
792            }
793        };
794
795        // Parse JSON response, tolerating markdown code fences
796        let json_str = extract_json(&response);
797        match serde_json::from_str::<DisambiguationResponse>(json_str) {
798            Ok(parsed) => Some(parsed.same_entity),
799            Err(err) => {
800                tracing::warn!(error = %err, response = %response, "failed to parse LLM disambiguation response");
801                None
802            }
803        }
804    }
805
806    /// Resolve a batch of extracted entities concurrently.
807    ///
808    /// Returns a `Vec` of `(entity_id, ResolutionOutcome)` in the same order as input.
809    ///
810    /// # Errors
811    ///
812    /// Returns an error if any DB operation fails.
813    ///
814    /// # Panics
815    ///
816    /// Panics if an internal stream collection bug causes a result index to be missing.
817    /// This indicates a programming error and should never occur in correct usage.
818    pub async fn resolve_batch(
819        &self,
820        entities: &[ExtractedEntity],
821    ) -> Result<Vec<(i64, ResolutionOutcome)>, MemoryError> {
822        if entities.is_empty() {
823            return Ok(Vec::new());
824        }
825
826        // Process up to 4 embed+resolve operations concurrently (IC-S2/PERF-01).
827        let mut results: Vec<Option<(i64, ResolutionOutcome)>> = vec![None; entities.len()];
828
829        let mut stream = stream::iter(entities.iter().enumerate().map(|(i, e)| {
830            let name = e.name.clone();
831            let entity_type = e.entity_type.clone();
832            let summary = e.summary.clone();
833            async move {
834                let result = self.resolve(&name, &entity_type, summary.as_deref()).await;
835                (i, result)
836            }
837        }))
838        .buffer_unordered(4);
839
840        while let Some((i, result)) = stream.next().await {
841            match result {
842                Ok(outcome) => results[i] = Some(outcome),
843                Err(err) => return Err(err),
844            }
845        }
846
847        Ok(results
848            .into_iter()
849            .enumerate()
850            .map(|(i, r)| {
851                r.unwrap_or_else(|| {
852                    tracing::warn!(
853                        index = i,
854                        "resolve_batch: missing result at index — bug in stream collection"
855                    );
856                    panic!("resolve_batch: missing result at index {i}")
857                })
858            })
859            .collect())
860    }
861
862    /// Resolve an extracted edge: deduplicate or supersede existing edges.
863    ///
864    /// - If an active edge with the same direction and relation exists with an identical fact,
865    ///   returns `None` (deduplicated).
866    /// - If an active edge with the same direction and relation exists with a different fact,
867    ///   invalidates the old edge and inserts the new one, returning `Some(new_id)`.
868    /// - If no matching edge exists, inserts a new edge and returns `Some(new_id)`.
869    ///
870    /// Relation and fact strings are sanitized (control chars stripped, length-capped).
871    ///
872    /// # Errors
873    ///
874    /// Returns an error if any database operation fails.
875    pub async fn resolve_edge(
876        &self,
877        source_id: i64,
878        target_id: i64,
879        relation: &str,
880        fact: &str,
881        confidence: f32,
882        episode_id: Option<MessageId>,
883    ) -> Result<Option<i64>, MemoryError> {
884        let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
885        let normalized_relation =
886            truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
887
888        let fact_clean = strip_control_chars(fact.trim());
889        let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
890
891        // Fetch only exact-direction edges — no reverse edges to filter out
892        let existing_edges = self.store.edges_exact(source_id, target_id).await?;
893
894        let matching = existing_edges
895            .iter()
896            .find(|e| e.relation == normalized_relation);
897
898        if let Some(old) = matching {
899            if old.fact == normalized_fact {
900                // Exact duplicate — skip
901                return Ok(None);
902            }
903            // Same relation, different fact — supersede
904            self.store.invalidate_edge(old.id).await?;
905        }
906
907        let new_id = self
908            .store
909            .insert_edge(
910                source_id,
911                target_id,
912                &normalized_relation,
913                &normalized_fact,
914                confidence,
915                episode_id,
916            )
917            .await?;
918        Ok(Some(new_id))
919    }
920
921    /// Resolve a typed edge: deduplicate or supersede existing edges of the same type.
922    ///
923    /// Identical to [`Self::resolve_edge`] but includes `edge_type` in the matching key.
924    /// An active edge with the same `(source, target, relation, edge_type)` and identical
925    /// fact returns `None`; same relation+type with different fact is superseded.
926    ///
927    /// When `belief_revision` is `Some`, uses semantic contradiction detection to find edges
928    /// to supersede across the same relation domain. The new fact embedding is pre-computed
929    /// here (one embed call) to avoid N+1 embedding calls.
930    ///
931    /// This ensures that different MAGMA edge types for the same entity pair are stored
932    /// independently (critic mitigation: dedup key includes `edge_type`).
933    ///
934    /// # Errors
935    ///
936    /// Returns an error if any database operation fails.
937    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
938    pub async fn resolve_edge_typed(
939        &self,
940        source_id: i64,
941        target_id: i64,
942        relation: &str,
943        fact: &str,
944        confidence: f32,
945        episode_id: Option<crate::types::MessageId>,
946        edge_type: crate::graph::EdgeType,
947        belief_revision: Option<&crate::graph::BeliefRevisionConfig>,
948    ) -> Result<Option<i64>, MemoryError> {
949        let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
950        let normalized_relation =
951            truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
952
953        let fact_clean = strip_control_chars(fact.trim());
954        let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
955
956        let existing_edges = self.store.edges_exact(source_id, target_id).await?;
957
958        // Exact dedup: same (relation, edge_type, fact) → skip.
959        let matching = existing_edges
960            .iter()
961            .find(|e| e.relation == normalized_relation && e.edge_type == edge_type);
962
963        if matching.is_some_and(|old| old.fact == normalized_fact) {
964            return Ok(None);
965        }
966
967        // Determine which edges to supersede.
968        let superseded_ids: Vec<i64> = if let (Some(cfg), Some(provider)) =
969            (belief_revision, self.provider)
970        {
971            // Kumiho belief revision: embed new fact once, find semantically contradicted edges.
972            match tokio::time::timeout(
973                std::time::Duration::from_secs(5),
974                provider.embed(&normalized_fact),
975            )
976            .await
977            {
978                Ok(Ok(new_emb)) => {
979                    match crate::graph::belief_revision::find_superseded_edges(
980                        &existing_edges,
981                        &new_emb,
982                        &normalized_relation,
983                        edge_type,
984                        provider,
985                        cfg,
986                    )
987                    .await
988                    {
989                        Ok(ids) => ids,
990                        Err(err) => {
991                            tracing::warn!(error = %err,
992                                    "belief_revision: find_superseded_edges failed, falling back to exact match");
993                            matching.map(|e| vec![e.id]).unwrap_or_default()
994                        }
995                    }
996                }
997                Ok(Err(err)) => {
998                    tracing::warn!(error = %err,
999                            "belief_revision: embed new fact failed, falling back to exact match");
1000                    matching.map(|e| vec![e.id]).unwrap_or_default()
1001                }
1002                Err(_) => {
1003                    tracing::warn!(
1004                        "belief_revision: embed new fact timed out, falling back to exact match"
1005                    );
1006                    matching.map(|e| vec![e.id]).unwrap_or_default()
1007                }
1008            }
1009        } else {
1010            // Legacy: exact (relation, edge_type) match with different fact.
1011            matching.map(|e| vec![e.id]).unwrap_or_default()
1012        };
1013
1014        let new_id = self
1015            .store
1016            .insert_edge_typed(
1017                source_id,
1018                target_id,
1019                &normalized_relation,
1020                &normalized_fact,
1021                confidence,
1022                episode_id,
1023                edge_type,
1024            )
1025            .await?;
1026
1027        // Supersede old edges with audit trail (belief revision) or plain invalidation (legacy).
1028        for old_id in superseded_ids {
1029            if belief_revision.is_some() {
1030                self.store
1031                    .invalidate_edge_with_supersession(old_id, new_id)
1032                    .await?;
1033            } else {
1034                self.store.invalidate_edge(old_id).await?;
1035            }
1036        }
1037
1038        Ok(Some(new_id))
1039    }
1040}
1041
1042/// Extract a JSON object from a string that may contain markdown code fences.
1043fn extract_json(s: &str) -> &str {
1044    let trimmed = s.trim();
1045    // Strip ```json ... ``` or ``` ... ```
1046    if let Some(inner) = trimmed.strip_prefix("```json")
1047        && let Some(end) = inner.rfind("```")
1048    {
1049        return inner[..end].trim();
1050    }
1051    if let Some(inner) = trimmed.strip_prefix("```")
1052        && let Some(end) = inner.rfind("```")
1053    {
1054        return inner[..end].trim();
1055    }
1056    // Find first '{' to last '}'
1057    if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
1058        && start <= end
1059    {
1060        return &trimmed[start..=end];
1061    }
1062    trimmed
1063}
1064
1065#[cfg(test)]
1066mod tests;