Skip to main content

zeph_memory/graph/resolver/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::sync::Arc;
5
6use dashmap::DashMap;
7use futures::stream::{self, StreamExt as _};
8use schemars::JsonSchema;
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use zeph_common::sanitize::strip_control_chars;
12use zeph_common::text::truncate_to_bytes_ref;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{LlmProvider as _, Message, Role};
15
16use super::store::GraphStore;
17use super::types::EntityType;
18use crate::embedding_store::EmbeddingStore;
19use crate::error::MemoryError;
20use crate::graph::extractor::ExtractedEntity;
21use crate::types::MessageId;
22use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
23
24/// Minimum byte length for entity names — rejects noise tokens like "go", "cd".
25const MIN_ENTITY_NAME_BYTES: usize = 3;
26/// Maximum byte length for entity names stored in the graph.
27const MAX_ENTITY_NAME_BYTES: usize = 512;
28/// Maximum byte length for relation strings.
29const MAX_RELATION_BYTES: usize = 256;
30/// Maximum byte length for fact strings.
31const MAX_FACT_BYTES: usize = 2048;
32
33/// Qdrant collection for entity embeddings.
34const ENTITY_COLLECTION: &str = "zeph_graph_entities";
35
36/// Timeout for a single `embed()` call in seconds.
37const EMBED_TIMEOUT_SECS: u64 = 30;
38
39/// Outcome of an entity resolution attempt.
40#[derive(Debug, Clone, PartialEq)]
41pub enum ResolutionOutcome {
42    /// Exact name+type match in `SQLite`.
43    ExactMatch,
44    /// Cosine similarity >= merge threshold; score is the cosine similarity value.
45    EmbeddingMatch { score: f32 },
46    /// LLM confirmed merge in ambiguous similarity range.
47    LlmDisambiguated,
48    /// New entity was created.
49    Created,
50}
51
52/// LLM response for entity disambiguation.
53#[derive(Debug, Deserialize, JsonSchema)]
54struct DisambiguationResponse {
55    same_entity: bool,
56}
57
58/// Per-entity-name lock guard to prevent concurrent duplicate creation.
59///
60/// Keyed by normalized entity name. Entities with different names resolve concurrently;
61/// entities with the same name are serialized.
62///
63/// TODO(SEC-M33-02): This map grows unboundedly — one entry per unique normalized name.
64/// For a short-lived resolver this is acceptable. If the resolver becomes long-lived
65/// (stored in `SemanticMemory`), add eviction or use a fixed-size sharded lock array.
66type NameLockMap = Arc<DashMap<String, Arc<Mutex<()>>>>;
67
68pub struct EntityResolver<'a> {
69    store: &'a GraphStore,
70    embedding_store: Option<&'a Arc<EmbeddingStore>>,
71    provider: Option<&'a AnyProvider>,
72    similarity_threshold: f32,
73    ambiguous_threshold: f32,
74    name_locks: NameLockMap,
75    /// Counter for error-triggered fallbacks (embed/LLM failures). Tests can read this via Arc.
76    fallback_count: Arc<std::sync::atomic::AtomicU64>,
77    /// Ensures `ensure_named_collection()` is called at most once per resolver lifetime.
78    ///
79    /// Arc-wrapped so future clones or spawned tasks can share the same gate.
80    collection_ensured: Arc<tokio::sync::OnceCell<()>>,
81}
82
83impl<'a> EntityResolver<'a> {
84    /// Returns a reference to the underlying graph store.
85    #[must_use]
86    pub fn graph_store(&self) -> &GraphStore {
87        self.store
88    }
89
90    #[must_use]
91    pub fn new(store: &'a GraphStore) -> Self {
92        Self {
93            store,
94            embedding_store: None,
95            provider: None,
96            similarity_threshold: 0.85,
97            ambiguous_threshold: 0.70,
98            name_locks: Arc::new(DashMap::new()),
99            fallback_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
100            collection_ensured: Arc::new(tokio::sync::OnceCell::new()),
101        }
102    }
103
104    #[must_use]
105    pub fn with_embedding_store(mut self, store: &'a Arc<EmbeddingStore>) -> Self {
106        self.embedding_store = Some(store);
107        self
108    }
109
110    #[must_use]
111    pub fn with_provider(mut self, provider: &'a AnyProvider) -> Self {
112        self.provider = Some(provider);
113        self
114    }
115
116    #[must_use]
117    pub fn with_thresholds(mut self, similarity: f32, ambiguous: f32) -> Self {
118        self.similarity_threshold = similarity;
119        self.ambiguous_threshold = ambiguous;
120        self
121    }
122
123    /// Shared fallback counter — tests can clone this Arc to inspect the value.
124    #[must_use]
125    pub fn fallback_count(&self) -> Arc<std::sync::atomic::AtomicU64> {
126        Arc::clone(&self.fallback_count)
127    }
128
129    /// Normalize an entity name: trim, lowercase, strip control chars, truncate.
130    fn normalize_name(name: &str) -> String {
131        let lowered = name.trim().to_lowercase();
132        let cleaned = strip_control_chars(&lowered);
133        let normalized = truncate_to_bytes_ref(&cleaned, MAX_ENTITY_NAME_BYTES).to_owned();
134        if normalized.len() < cleaned.len() {
135            tracing::debug!(
136                "graph resolver: entity name truncated to {} bytes",
137                MAX_ENTITY_NAME_BYTES
138            );
139        }
140        normalized
141    }
142
143    /// Parse an entity type string, falling back to `Concept` on unknown values.
144    fn parse_entity_type(entity_type: &str) -> EntityType {
145        entity_type
146            .trim()
147            .to_lowercase()
148            .parse::<EntityType>()
149            .unwrap_or_else(|_| {
150                tracing::debug!(
151                    "graph resolver: unknown entity type {:?}, falling back to Concept",
152                    entity_type
153                );
154                EntityType::Concept
155            })
156    }
157
158    /// Acquire the per-name lock and return the guard. Keeps lock alive for the caller.
159    async fn lock_name(&self, normalized: &str) -> tokio::sync::OwnedMutexGuard<()> {
160        let lock = self
161            .name_locks
162            .entry(normalized.to_owned())
163            .or_insert_with(|| Arc::new(Mutex::new(())))
164            .clone();
165        lock.lock_owned().await
166    }
167
168    /// Resolve an extracted entity using the alias-first canonicalization pipeline.
169    ///
170    /// Pipeline:
171    /// 1. Normalize: trim, lowercase, strip control chars, truncate to 512 bytes.
172    /// 2. Parse entity type (fallback to Concept on unknown).
173    /// 3. Alias lookup: search `graph_entity_aliases` by normalized name + `entity_type`.
174    ///    If found, touch `last_seen_at` and return the existing entity id.
175    /// 4. Canonical name lookup: search `graph_entities` by `canonical_name` + `entity_type`.
176    ///    If found, touch `last_seen_at` and return the existing entity id.
177    /// 5. When `embedding_store` and `provider` are configured, performs embedding-based fuzzy
178    ///    matching: cosine similarity search (Qdrant), LLM disambiguation for ambiguous range,
179    ///    merge or create based on result. Failures degrade gracefully to step 6.
180    /// 6. Create: upsert new entity with `canonical_name` = normalized name.
181    /// 7. Register the normalized form (and original trimmed form if different) as aliases.
182    ///
183    /// # Errors
184    ///
185    /// Returns an error if the entity name is empty after normalization, or if a DB operation fails.
186    pub async fn resolve(
187        &self,
188        name: &str,
189        entity_type: &str,
190        summary: Option<&str>,
191    ) -> Result<(i64, ResolutionOutcome), MemoryError> {
192        let normalized = Self::normalize_name(name);
193
194        if normalized.is_empty() {
195            return Err(MemoryError::GraphStore("empty entity name".into()));
196        }
197
198        if normalized.len() < MIN_ENTITY_NAME_BYTES {
199            return Err(MemoryError::GraphStore(format!(
200                "entity name too short: {normalized:?} ({} bytes, min {MIN_ENTITY_NAME_BYTES})",
201                normalized.len()
202            )));
203        }
204
205        let et = Self::parse_entity_type(entity_type);
206
207        // The surface form preserves the original casing for user-facing display.
208        let surface_name = name.trim().to_owned();
209
210        // Acquire per-name lock to prevent concurrent duplicate creation.
211        let _guard = self.lock_name(&normalized).await;
212
213        // Step 3: alias-first lookup (filters by entity_type to prevent cross-type collisions).
214        if let Some(entity) = self.store.find_entity_by_alias(&normalized, et).await? {
215            self.store
216                .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
217                .await?;
218            return Ok((entity.id, ResolutionOutcome::ExactMatch));
219        }
220
221        // Step 4: canonical name lookup.
222        if let Some(entity) = self.store.find_entity(&normalized, et).await? {
223            self.store
224                .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
225                .await?;
226            return Ok((entity.id, ResolutionOutcome::ExactMatch));
227        }
228
229        // Step 5: Embedding-based resolution (when configured).
230        if let Some(outcome) = self
231            .resolve_via_embedding(&normalized, name, &surface_name, et, summary)
232            .await?
233        {
234            return Ok(outcome);
235        }
236
237        // Step 6: Create new entity (no embedding store, or embedding failure).
238        let entity_id = self
239            .store
240            .upsert_entity(&surface_name, &normalized, et, summary)
241            .await?;
242
243        self.register_aliases(entity_id, &normalized, name).await?;
244
245        Ok((entity_id, ResolutionOutcome::Created))
246    }
247
248    /// Compute embedding for an entity, incrementing `fallback_count` on failure/timeout.
249    /// Returns `None` when embedding is unavailable (caller should skip vector operations).
250    async fn embed_entity_text(
251        &self,
252        provider: &AnyProvider,
253        normalized: &str,
254        summary: Option<&str>,
255    ) -> Option<Vec<f32>> {
256        let safe_summary = truncate_to_bytes_ref(summary.unwrap_or(""), MAX_FACT_BYTES);
257        let embed_text = format!("{normalized}: {safe_summary}");
258        let embed_result = tokio::time::timeout(
259            std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
260            provider.embed(&embed_text),
261        )
262        .await;
263        match embed_result {
264            Ok(Ok(v)) => Some(v),
265            Ok(Err(err)) => {
266                self.fallback_count
267                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
268                tracing::warn!(entity_name = %normalized, error = %err,
269                    "embed() failed; falling back to exact-match-only entity creation");
270                None
271            }
272            Err(_timeout) => {
273                self.fallback_count
274                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
275                tracing::warn!(entity_name = %normalized,
276                    "embed() timed out after {}s; falling back to create new entity",
277                    EMBED_TIMEOUT_SECS);
278                None
279            }
280        }
281    }
282
283    /// Handle a candidate in the ambiguous score range by running LLM disambiguation.
284    /// Returns `Ok(Some(...))` if the LLM confirms a match, `Ok(None)` to fall through to create.
285    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
286    async fn handle_ambiguous_candidate(
287        &self,
288        emb_store: &EmbeddingStore,
289        provider: &AnyProvider,
290        payload: &std::collections::HashMap<String, serde_json::Value>,
291        point_id: &str,
292        score: f32,
293        surface_name: &str,
294        normalized: &str,
295        et: EntityType,
296        summary: Option<&str>,
297    ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
298        let entity_id = payload
299            .get("entity_id")
300            .and_then(serde_json::Value::as_i64)
301            .ok_or_else(|| MemoryError::GraphStore("missing entity_id in payload".into()))?;
302        let existing_name = payload
303            .get("name")
304            .and_then(|v| v.as_str())
305            .unwrap_or("")
306            .to_owned();
307        let existing_summary = payload
308            .get("summary")
309            .and_then(|v| v.as_str())
310            .unwrap_or("")
311            .to_owned();
312        // Use the existing entity's actual type from the payload (IC-S3)
313        let existing_type = payload
314            .get("entity_type")
315            .and_then(|v| v.as_str())
316            .unwrap_or(et.as_str())
317            .to_owned();
318        let existing_canonical = payload.get("canonical_name").and_then(|v| v.as_str());
319        let existing_summary_str = payload.get("summary").and_then(|v| v.as_str());
320        match self
321            .llm_disambiguate(
322                provider,
323                normalized,
324                et.as_str(),
325                summary.unwrap_or(""),
326                &existing_name,
327                &existing_type,
328                &existing_summary,
329                score,
330            )
331            .await
332        {
333            Some(true) => {
334                self.merge_entity(
335                    emb_store,
336                    provider,
337                    entity_id,
338                    surface_name,
339                    normalized,
340                    et,
341                    summary,
342                    existing_canonical,
343                    existing_summary_str,
344                    Some(point_id),
345                )
346                .await?;
347                Ok(Some((entity_id, ResolutionOutcome::LlmDisambiguated)))
348            }
349            Some(false) => Ok(None),
350            None => {
351                self.fallback_count
352                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
353                tracing::warn!(entity_name = %normalized,
354                    "LLM disambiguation failed; falling back to create new entity");
355                Ok(None)
356            }
357        }
358    }
359
360    /// Attempt embedding-based resolution. Returns `Ok(Some(...))` if resolved (early return),
361    /// `Ok(None)` if no match found (caller should fall through to create), or `Err` on DB error.
362    async fn resolve_via_embedding(
363        &self,
364        normalized: &str,
365        original_name: &str,
366        surface_name: &str,
367        et: EntityType,
368        summary: Option<&str>,
369    ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
370        let (Some(emb_store), Some(provider)) = (self.embedding_store, self.provider) else {
371            return Ok(None);
372        };
373
374        let Some(query_vec) = self.embed_entity_text(provider, normalized, summary).await else {
375            return Ok(None);
376        };
377
378        let type_filter = VectorFilter {
379            must: vec![FieldCondition {
380                field: "entity_type".into(),
381                value: FieldValue::Text(et.as_str().to_owned()),
382            }],
383            must_not: vec![],
384        };
385        let candidates = match emb_store
386            .search_collection(ENTITY_COLLECTION, &query_vec, 5, Some(type_filter))
387            .await
388        {
389            Ok(c) => c,
390            Err(err) => {
391                self.fallback_count
392                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
393                tracing::warn!(entity_name = %normalized, error = %err,
394                    "Qdrant search failed; falling back to create new entity");
395                return self
396                    .create_with_embedding(
397                        emb_store,
398                        surface_name,
399                        normalized,
400                        original_name,
401                        et,
402                        summary,
403                        &query_vec,
404                    )
405                    .await
406                    .map(Some);
407            }
408        };
409
410        if let Some(best) = candidates.first() {
411            let score = best.score;
412            if score >= self.similarity_threshold {
413                let entity_id = best
414                    .payload
415                    .get("entity_id")
416                    .and_then(serde_json::Value::as_i64)
417                    .ok_or_else(|| {
418                        MemoryError::GraphStore("missing entity_id in payload".into())
419                    })?;
420                let existing_canonical =
421                    best.payload.get("canonical_name").and_then(|v| v.as_str());
422                let existing_summary = best.payload.get("summary").and_then(|v| v.as_str());
423                let existing_pid = Some(best.id.as_str());
424                self.merge_entity(
425                    emb_store,
426                    provider,
427                    entity_id,
428                    surface_name,
429                    normalized,
430                    et,
431                    summary,
432                    existing_canonical,
433                    existing_summary,
434                    existing_pid,
435                )
436                .await?;
437                return Ok(Some((
438                    entity_id,
439                    ResolutionOutcome::EmbeddingMatch { score },
440                )));
441            } else if score >= self.ambiguous_threshold
442                && let Some(result) = self
443                    .handle_ambiguous_candidate(
444                        emb_store,
445                        provider,
446                        &best.payload,
447                        &best.id,
448                        score,
449                        surface_name,
450                        normalized,
451                        et,
452                        summary,
453                    )
454                    .await?
455            {
456                return Ok(Some(result));
457            }
458            // score < ambiguous_threshold or LLM said different: fall through to create with embedding
459        }
460
461        // No suitable match — create new entity and store embedding.
462        self.create_with_embedding(
463            emb_store,
464            surface_name,
465            normalized,
466            original_name,
467            et,
468            summary,
469            &query_vec,
470        )
471        .await
472        .map(Some)
473    }
474
475    /// Create a new entity, register aliases, and store its embedding in Qdrant.
476    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
477    async fn create_with_embedding(
478        &self,
479        emb_store: &EmbeddingStore,
480        surface_name: &str,
481        normalized: &str,
482        original_name: &str,
483        et: EntityType,
484        summary: Option<&str>,
485        query_vec: &[f32],
486    ) -> Result<(i64, ResolutionOutcome), MemoryError> {
487        let entity_id = self
488            .store
489            .upsert_entity(surface_name, normalized, et, summary)
490            .await?;
491        self.register_aliases(entity_id, normalized, original_name)
492            .await?;
493        self.store_entity_embedding(
494            emb_store,
495            entity_id,
496            None,
497            normalized,
498            et,
499            summary.unwrap_or(""),
500            query_vec,
501        )
502        .await;
503        Ok((entity_id, ResolutionOutcome::Created))
504    }
505
506    /// Register the normalized form and original trimmed form as aliases for an entity.
507    async fn register_aliases(
508        &self,
509        entity_id: i64,
510        normalized: &str,
511        original_name: &str,
512    ) -> Result<(), MemoryError> {
513        self.store.add_alias(entity_id, normalized).await?;
514
515        // Also register the original trimmed lowercased form if it differs from normalized
516        // (e.g. when control chars were stripped, leaving a shorter string).
517        let original_trimmed = original_name.trim().to_lowercase();
518        let original_clean_str = strip_control_chars(&original_trimmed);
519        let original_clean = truncate_to_bytes_ref(&original_clean_str, MAX_ENTITY_NAME_BYTES);
520        if original_clean != normalized {
521            self.store.add_alias(entity_id, original_clean).await?;
522        }
523
524        Ok(())
525    }
526
527    /// Merge an existing entity with new information: combine summaries, update Qdrant.
528    ///
529    /// `existing_canonical_name` and `existing_summary` are read from the Qdrant payload at
530    /// the call site (hot path, no `SQLite` roundtrip). Pass `None` for either when the point
531    /// predates the payload fields — a targeted `find_entity_by_id` read is then used as a
532    /// one-time fallback (legacy transition path, removed after all points are rewritten).
533    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
534    async fn merge_entity(
535        &self,
536        emb_store: &EmbeddingStore,
537        provider: &AnyProvider,
538        entity_id: i64,
539        new_surface_name: &str,
540        new_canonical_name: &str,
541        entity_type: EntityType,
542        new_summary: Option<&str>,
543        existing_canonical_name: Option<&str>,
544        existing_summary_payload: Option<&str>,
545        existing_point_id: Option<&str>,
546    ) -> Result<(), MemoryError> {
547        // Hot path: use values from the Qdrant payload (no SQLite roundtrip).
548        // Both canonical_name AND summary must be present; if either is absent the point
549        // predates the payload field — fall back to a targeted SQLite read to avoid
550        // silently dropping a summary that was written outside merge_entity.
551        let (existing_canonical, existing_summary, existing_point_id_owned) =
552            if existing_canonical_name.is_some() && existing_summary_payload.is_some() {
553                (
554                    existing_canonical_name
555                        .unwrap_or(new_canonical_name)
556                        .to_owned(),
557                    existing_summary_payload.unwrap_or("").to_owned(),
558                    existing_point_id.map(ToOwned::to_owned),
559                )
560            } else {
561                // Transition-period fallback. Legacy Qdrant points pre-date the payload
562                // fields; one targeted read is acceptable until the embedding is rewritten
563                // on the next merge. Also used when canonical_name is present but summary
564                // is absent — prevents overwriting a SQLite summary with empty string.
565                let existing = self.store.find_entity_by_id(entity_id).await?;
566                let canonical = existing_canonical_name.map_or_else(
567                    || {
568                        existing.as_ref().map_or_else(
569                            || new_canonical_name.to_owned(),
570                            |e| e.canonical_name.clone(),
571                        )
572                    },
573                    ToOwned::to_owned,
574                );
575                let summary = existing
576                    .as_ref()
577                    .and_then(|e| e.summary.as_deref())
578                    .unwrap_or("")
579                    .to_owned();
580                let pid = existing_point_id.map(ToOwned::to_owned).or_else(|| {
581                    existing
582                        .as_ref()
583                        .and_then(|e| e.qdrant_point_id.as_deref())
584                        .map(ToOwned::to_owned)
585                });
586                (canonical, summary, pid)
587            };
588
589        let merged_summary = if let Some(new) = new_summary {
590            if !new.is_empty() && !existing_summary.is_empty() {
591                let combined = format!("{existing_summary}; {new}");
592                // TODO(S2): use LLM-based summary merge when summary exceeds 512 bytes
593                truncate_to_bytes_ref(&combined, MAX_FACT_BYTES).to_owned()
594            } else if !new.is_empty() {
595                new.to_owned()
596            } else {
597                existing_summary.clone()
598            }
599        } else {
600            existing_summary.clone()
601        };
602
603        let summary_opt = if merged_summary.is_empty() {
604            None
605        } else {
606            Some(merged_summary.as_str())
607        };
608
609        // Preserve the existing display name from the payload; fall back to the incoming surface
610        // name only for brand-new entities (where the payload had no "name" field).
611        self.store
612            .upsert_entity(
613                new_surface_name,
614                &existing_canonical,
615                entity_type,
616                summary_opt,
617            )
618            .await?;
619
620        // Re-embed merged text and upsert to Qdrant
621        let embed_text = format!("{new_surface_name}: {merged_summary}");
622        let embed_result = tokio::time::timeout(
623            std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
624            provider.embed(&embed_text),
625        )
626        .await;
627
628        match embed_result {
629            Ok(Ok(vec)) => {
630                self.store_entity_embedding(
631                    emb_store,
632                    entity_id,
633                    existing_point_id_owned.as_deref(),
634                    new_surface_name,
635                    entity_type,
636                    &merged_summary,
637                    &vec,
638                )
639                .await;
640            }
641            Ok(Err(err)) => {
642                tracing::warn!(
643                    entity_id,
644                    error = %err,
645                    "merge re-embed failed; Qdrant entry may be stale"
646                );
647            }
648            Err(_) => {
649                tracing::warn!(
650                    entity_id,
651                    "merge re-embed timed out; Qdrant entry may be stale"
652                );
653            }
654        }
655
656        Ok(())
657    }
658
659    /// Store an entity embedding in Qdrant and update `qdrant_point_id` in `SQLite`.
660    ///
661    /// When `existing_point_id` is `Some`, the existing Qdrant point is updated in-place
662    /// (upsert by ID) to avoid orphaned stale points. When `None`, a new point is created.
663    ///
664    /// Failures are logged at warn level but do not propagate — the entity is still
665    /// valid in `SQLite` even if Qdrant upsert fails.
666    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
667    async fn store_entity_embedding(
668        &self,
669        emb_store: &EmbeddingStore,
670        entity_id: i64,
671        existing_point_id: Option<&str>,
672        name: &str,
673        entity_type: EntityType,
674        summary: &str,
675        vector: &[f32],
676    ) {
677        // Ensure the Qdrant collection exists exactly once per resolver lifetime.
678        // All entity embeddings use the same model dimension, so the first call's
679        // vector_size wins — subsequent entities share the same collection.
680        // On error the cell stays unset, so the next entity retries — transient
681        // network failures do not permanently disable embedding storage.
682        let vector_size = u64::try_from(vector.len()).unwrap_or(384);
683        let collection_ensured = Arc::clone(&self.collection_ensured);
684        if let Err(err) = collection_ensured
685            .get_or_try_init(|| async {
686                emb_store
687                    .ensure_named_collection(ENTITY_COLLECTION, vector_size)
688                    .await
689            })
690            .await
691        {
692            tracing::error!(
693                error = %err,
694                "failed to ensure entity embedding collection; skipping Qdrant upsert"
695            );
696            return;
697        }
698
699        let payload = serde_json::json!({
700            "entity_id": entity_id,
701            // String mirror of entity_id for scroll_all enumeration (scroll_all only surfaces
702            // StringValue payload fields; the i64 entity_id is preserved for existing search
703            // consumers which read it directly from ScoredVectorPoint.payload).
704            "entity_id_str": entity_id.to_string(),
705            "canonical_name": name,
706            "name": name,
707            "entity_type": entity_type.as_str(),
708            "summary": summary,
709        });
710
711        if let Some(point_id) = existing_point_id {
712            // Reuse existing point to avoid orphaned stale points (IC-S1)
713            if let Err(err) = emb_store
714                .upsert_to_collection(ENTITY_COLLECTION, point_id, payload, vector.to_vec())
715                .await
716            {
717                tracing::warn!(
718                    entity_id,
719                    error = %err,
720                    "Qdrant upsert (existing point) failed; Qdrant entry may be stale"
721                );
722            }
723        } else {
724            match emb_store
725                .store_to_collection(ENTITY_COLLECTION, payload, vector.to_vec())
726                .await
727            {
728                Ok(point_id) => {
729                    if let Err(err) = self
730                        .store
731                        .set_entity_qdrant_point_id(entity_id, &point_id)
732                        .await
733                    {
734                        tracing::warn!(
735                            entity_id,
736                            error = %err,
737                            "failed to store qdrant_point_id in SQLite"
738                        );
739                    }
740                }
741                Err(err) => {
742                    tracing::warn!(
743                        entity_id,
744                        error = %err,
745                        "Qdrant upsert failed; entity created in SQLite, qdrant_point_id remains NULL"
746                    );
747                }
748            }
749        }
750    }
751
752    /// Ask the LLM whether two entities are the same.
753    ///
754    /// Returns `Some(true)` for merge, `Some(false)` for separate, `None` on failure.
755    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
756    async fn llm_disambiguate(
757        &self,
758        provider: &AnyProvider,
759        new_name: &str,
760        new_type: &str,
761        new_summary: &str,
762        existing_name: &str,
763        existing_type: &str,
764        existing_summary: &str,
765        score: f32,
766    ) -> Option<bool> {
767        let prompt = format!(
768            "New entity:\n\
769             - Name: <external-data>{new_name}</external-data>\n\
770             - Type: <external-data>{new_type}</external-data>\n\
771             - Summary: <external-data>{new_summary}</external-data>\n\
772             \n\
773             Existing entity:\n\
774             - Name: <external-data>{existing_name}</external-data>\n\
775             - Type: <external-data>{existing_type}</external-data>\n\
776             - Summary: <external-data>{existing_summary}</external-data>\n\
777             \n\
778             Cosine similarity: {score:.2}\n\
779             \n\
780             Are these the same entity? Respond with JSON: {{\"same_entity\": true}} or {{\"same_entity\": false}}"
781        );
782
783        let messages = [
784            Message::from_legacy(
785                Role::System,
786                "You are an entity disambiguation assistant. Given a new entity mention and \
787                 an existing entity from the knowledge graph, determine if they refer to the same \
788                 real-world entity. Respond only with JSON.",
789            ),
790            Message::from_legacy(Role::User, prompt),
791        ];
792
793        let response = match provider.chat(&messages).await {
794            Ok(r) => r,
795            Err(err) => {
796                tracing::warn!(error = %err, "LLM disambiguation chat failed");
797                return None;
798            }
799        };
800
801        // Parse JSON response, tolerating markdown code fences
802        let json_str = extract_json(&response);
803        match serde_json::from_str::<DisambiguationResponse>(json_str) {
804            Ok(parsed) => Some(parsed.same_entity),
805            Err(err) => {
806                tracing::warn!(error = %err, response = %response, "failed to parse LLM disambiguation response");
807                None
808            }
809        }
810    }
811
812    /// Resolve a batch of extracted entities concurrently.
813    ///
814    /// Returns a `Vec` of `(entity_id, ResolutionOutcome)` in the same order as input.
815    ///
816    /// # Errors
817    ///
818    /// Returns an error if any DB operation fails.
819    ///
820    /// # Panics
821    ///
822    /// Panics if an internal stream collection bug causes a result index to be missing.
823    /// This indicates a programming error and should never occur in correct usage.
824    pub async fn resolve_batch(
825        &self,
826        entities: &[ExtractedEntity],
827    ) -> Result<Vec<(i64, ResolutionOutcome)>, MemoryError> {
828        if entities.is_empty() {
829            return Ok(Vec::new());
830        }
831
832        // Process up to 4 embed+resolve operations concurrently (IC-S2/PERF-01).
833        let mut results: Vec<Option<(i64, ResolutionOutcome)>> = vec![None; entities.len()];
834
835        let mut stream = stream::iter(entities.iter().enumerate().map(|(i, e)| {
836            let name = e.name.clone();
837            let entity_type = e.entity_type.clone();
838            let summary = e.summary.clone();
839            async move {
840                let result = self.resolve(&name, &entity_type, summary.as_deref()).await;
841                (i, result)
842            }
843        }))
844        .buffer_unordered(4);
845
846        while let Some((i, result)) = stream.next().await {
847            match result {
848                Ok(outcome) => results[i] = Some(outcome),
849                Err(err) => return Err(err),
850            }
851        }
852
853        Ok(results
854            .into_iter()
855            .enumerate()
856            .map(|(i, r)| {
857                r.unwrap_or_else(|| {
858                    tracing::warn!(
859                        index = i,
860                        "resolve_batch: missing result at index — bug in stream collection"
861                    );
862                    panic!("resolve_batch: missing result at index {i}")
863                })
864            })
865            .collect())
866    }
867
868    /// Resolve an extracted edge: deduplicate or supersede existing edges.
869    ///
870    /// - If an active edge with the same direction and relation exists with an identical fact,
871    ///   returns `None` (deduplicated).
872    /// - If an active edge with the same direction and relation exists with a different fact,
873    ///   invalidates the old edge and inserts the new one, returning `Some(new_id)`.
874    /// - If no matching edge exists, inserts a new edge and returns `Some(new_id)`.
875    ///
876    /// Relation and fact strings are sanitized (control chars stripped, length-capped).
877    ///
878    /// # Errors
879    ///
880    /// Returns an error if any database operation fails.
881    pub async fn resolve_edge(
882        &self,
883        source_id: i64,
884        target_id: i64,
885        relation: &str,
886        fact: &str,
887        confidence: f32,
888        episode_id: Option<MessageId>,
889    ) -> Result<Option<i64>, MemoryError> {
890        let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
891        let normalized_relation =
892            truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
893
894        let fact_clean = strip_control_chars(fact.trim());
895        let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
896
897        // Fetch only exact-direction edges — no reverse edges to filter out
898        let existing_edges = self.store.edges_exact(source_id, target_id).await?;
899
900        let matching = existing_edges
901            .iter()
902            .find(|e| e.relation == normalized_relation);
903
904        if let Some(old) = matching {
905            if old.fact == normalized_fact {
906                // Exact duplicate — skip
907                return Ok(None);
908            }
909            // Same relation, different fact — supersede
910            self.store.invalidate_edge(old.id).await?;
911        }
912
913        let new_id = self
914            .store
915            .insert_edge(
916                source_id,
917                target_id,
918                &normalized_relation,
919                &normalized_fact,
920                confidence,
921                episode_id,
922            )
923            .await?;
924        Ok(Some(new_id))
925    }
926
927    /// Resolve a typed edge: deduplicate or supersede existing edges of the same type.
928    ///
929    /// Identical to [`Self::resolve_edge`] but includes `edge_type` in the matching key.
930    /// An active edge with the same `(source, target, relation, edge_type)` and identical
931    /// fact returns `None`; same relation+type with different fact is superseded.
932    ///
933    /// When `belief_revision` is `Some`, uses semantic contradiction detection to find edges
934    /// to supersede across the same relation domain. The new fact embedding is pre-computed
935    /// here (one embed call) to avoid N+1 embedding calls.
936    ///
937    /// This ensures that different MAGMA edge types for the same entity pair are stored
938    /// independently (critic mitigation: dedup key includes `edge_type`).
939    ///
940    /// # Errors
941    ///
942    /// Returns an error if any database operation fails.
943    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
944    pub async fn resolve_edge_typed(
945        &self,
946        source_id: i64,
947        target_id: i64,
948        relation: &str,
949        fact: &str,
950        confidence: f32,
951        episode_id: Option<crate::types::MessageId>,
952        edge_type: crate::graph::EdgeType,
953        belief_revision: Option<&crate::graph::BeliefRevisionConfig>,
954    ) -> Result<Option<i64>, MemoryError> {
955        let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
956        let normalized_relation =
957            truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
958
959        let fact_clean = strip_control_chars(fact.trim());
960        let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
961
962        let existing_edges = self.store.edges_exact(source_id, target_id).await?;
963
964        // Exact dedup: same (relation, edge_type, fact) → skip.
965        let matching = existing_edges
966            .iter()
967            .find(|e| e.relation == normalized_relation && e.edge_type == edge_type);
968
969        if matching.is_some_and(|old| old.fact == normalized_fact) {
970            return Ok(None);
971        }
972
973        // Determine which edges to supersede.
974        let superseded_ids: Vec<i64> = if let (Some(cfg), Some(provider)) =
975            (belief_revision, self.provider)
976        {
977            // Kumiho belief revision: embed new fact once, find semantically contradicted edges.
978            match tokio::time::timeout(
979                std::time::Duration::from_secs(5),
980                provider.embed(&normalized_fact),
981            )
982            .await
983            {
984                Ok(Ok(new_emb)) => {
985                    match crate::graph::belief_revision::find_superseded_edges(
986                        &existing_edges,
987                        &new_emb,
988                        &normalized_relation,
989                        edge_type,
990                        provider,
991                        cfg,
992                    )
993                    .await
994                    {
995                        Ok(ids) => ids,
996                        Err(err) => {
997                            tracing::warn!(error = %err,
998                                    "belief_revision: find_superseded_edges failed, falling back to exact match");
999                            matching.map(|e| vec![e.id]).unwrap_or_default()
1000                        }
1001                    }
1002                }
1003                Ok(Err(err)) => {
1004                    tracing::warn!(error = %err,
1005                            "belief_revision: embed new fact failed, falling back to exact match");
1006                    matching.map(|e| vec![e.id]).unwrap_or_default()
1007                }
1008                Err(_) => {
1009                    tracing::warn!(
1010                        "belief_revision: embed new fact timed out, falling back to exact match"
1011                    );
1012                    matching.map(|e| vec![e.id]).unwrap_or_default()
1013                }
1014            }
1015        } else {
1016            // Legacy: exact (relation, edge_type) match with different fact.
1017            matching.map(|e| vec![e.id]).unwrap_or_default()
1018        };
1019
1020        let new_id = self
1021            .store
1022            .insert_edge_typed(
1023                source_id,
1024                target_id,
1025                &normalized_relation,
1026                &normalized_fact,
1027                confidence,
1028                episode_id,
1029                edge_type,
1030            )
1031            .await?;
1032
1033        // Supersede old edges with audit trail (belief revision) or plain invalidation (legacy).
1034        for old_id in superseded_ids {
1035            if belief_revision.is_some() {
1036                self.store
1037                    .invalidate_edge_with_supersession(old_id, new_id)
1038                    .await?;
1039            } else {
1040                self.store.invalidate_edge(old_id).await?;
1041            }
1042        }
1043
1044        Ok(Some(new_id))
1045    }
1046}
1047
1048/// Extract a JSON object from a string that may contain markdown code fences.
1049fn extract_json(s: &str) -> &str {
1050    let trimmed = s.trim();
1051    // Strip ```json ... ``` or ``` ... ```
1052    if let Some(inner) = trimmed.strip_prefix("```json")
1053        && let Some(end) = inner.rfind("```")
1054    {
1055        return inner[..end].trim();
1056    }
1057    if let Some(inner) = trimmed.strip_prefix("```")
1058        && let Some(end) = inner.rfind("```")
1059    {
1060        return inner[..end].trim();
1061    }
1062    // Find first '{' to last '}'
1063    if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
1064        && start <= end
1065    {
1066        return &trimmed[start..=end];
1067    }
1068    trimmed
1069}
1070
1071#[cfg(test)]
1072mod tests;