1use std::sync::Arc;
5
6use dashmap::DashMap;
7use futures::stream::{self, StreamExt as _};
8use schemars::JsonSchema;
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use zeph_llm::any::AnyProvider;
12use zeph_llm::provider::{LlmProvider as _, Message, Role};
13
14use super::store::GraphStore;
15use super::types::EntityType;
16use crate::embedding_store::EmbeddingStore;
17use crate::error::MemoryError;
18use crate::graph::extractor::ExtractedEntity;
19use crate::types::MessageId;
20use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
21
22const MAX_ENTITY_NAME_BYTES: usize = 512;
24const MAX_RELATION_BYTES: usize = 256;
26const MAX_FACT_BYTES: usize = 2048;
28
29const ENTITY_COLLECTION: &str = "zeph_graph_entities";
31
32const EMBED_TIMEOUT_SECS: u64 = 30;
34
35fn strip_control_chars(s: &str) -> String {
37 s.chars()
38 .filter(|c| !c.is_control() && !matches!(*c as u32, 0x202A..=0x202E | 0x2066..=0x2069))
39 .collect()
40}
41
42fn truncate_to_bytes(s: &str, max_bytes: usize) -> &str {
44 if s.len() <= max_bytes {
45 return s;
46 }
47 let mut boundary = max_bytes;
48 while !s.is_char_boundary(boundary) {
49 boundary -= 1;
50 }
51 &s[..boundary]
52}
53
54#[derive(Debug, Clone, PartialEq)]
56pub enum ResolutionOutcome {
57 ExactMatch,
59 EmbeddingMatch { score: f32 },
61 LlmDisambiguated,
63 Created,
65}
66
67#[derive(Debug, Deserialize, JsonSchema)]
69struct DisambiguationResponse {
70 same_entity: bool,
71}
72
73type NameLockMap = Arc<DashMap<String, Arc<Mutex<()>>>>;
82
83pub struct EntityResolver<'a> {
84 store: &'a GraphStore,
85 embedding_store: Option<&'a Arc<EmbeddingStore>>,
86 provider: Option<&'a AnyProvider>,
87 similarity_threshold: f32,
88 ambiguous_threshold: f32,
89 name_locks: NameLockMap,
90 fallback_count: Arc<std::sync::atomic::AtomicU64>,
92}
93
94impl<'a> EntityResolver<'a> {
95 #[must_use]
96 pub fn new(store: &'a GraphStore) -> Self {
97 Self {
98 store,
99 embedding_store: None,
100 provider: None,
101 similarity_threshold: 0.85,
102 ambiguous_threshold: 0.70,
103 name_locks: Arc::new(DashMap::new()),
104 fallback_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
105 }
106 }
107
108 #[must_use]
109 pub fn with_embedding_store(mut self, store: &'a Arc<EmbeddingStore>) -> Self {
110 self.embedding_store = Some(store);
111 self
112 }
113
114 #[must_use]
115 pub fn with_provider(mut self, provider: &'a AnyProvider) -> Self {
116 self.provider = Some(provider);
117 self
118 }
119
120 #[must_use]
121 pub fn with_thresholds(mut self, similarity: f32, ambiguous: f32) -> Self {
122 self.similarity_threshold = similarity;
123 self.ambiguous_threshold = ambiguous;
124 self
125 }
126
127 #[must_use]
129 pub fn fallback_count(&self) -> Arc<std::sync::atomic::AtomicU64> {
130 Arc::clone(&self.fallback_count)
131 }
132
133 fn normalize_name(name: &str) -> String {
135 let lowered = name.trim().to_lowercase();
136 let cleaned = strip_control_chars(&lowered);
137 let normalized = truncate_to_bytes(&cleaned, MAX_ENTITY_NAME_BYTES).to_owned();
138 if normalized.len() < cleaned.len() {
139 tracing::debug!(
140 "graph resolver: entity name truncated to {} bytes",
141 MAX_ENTITY_NAME_BYTES
142 );
143 }
144 normalized
145 }
146
147 fn parse_entity_type(entity_type: &str) -> EntityType {
149 entity_type
150 .trim()
151 .to_lowercase()
152 .parse::<EntityType>()
153 .unwrap_or_else(|_| {
154 tracing::debug!(
155 "graph resolver: unknown entity type {:?}, falling back to Concept",
156 entity_type
157 );
158 EntityType::Concept
159 })
160 }
161
162 async fn lock_name(&self, normalized: &str) -> tokio::sync::OwnedMutexGuard<()> {
164 let lock = self
165 .name_locks
166 .entry(normalized.to_owned())
167 .or_insert_with(|| Arc::new(Mutex::new(())))
168 .clone();
169 lock.lock_owned().await
170 }
171
172 pub async fn resolve(
191 &self,
192 name: &str,
193 entity_type: &str,
194 summary: Option<&str>,
195 ) -> Result<(i64, ResolutionOutcome), MemoryError> {
196 let normalized = Self::normalize_name(name);
197
198 if normalized.is_empty() {
199 return Err(MemoryError::GraphStore("empty entity name".into()));
200 }
201
202 let et = Self::parse_entity_type(entity_type);
203
204 let surface_name = name.trim().to_owned();
206
207 let _guard = self.lock_name(&normalized).await;
209
210 if let Some(entity) = self.store.find_entity_by_alias(&normalized, et).await? {
212 self.store
213 .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
214 .await?;
215 return Ok((entity.id, ResolutionOutcome::ExactMatch));
216 }
217
218 if let Some(entity) = self.store.find_entity(&normalized, et).await? {
220 self.store
221 .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
222 .await?;
223 return Ok((entity.id, ResolutionOutcome::ExactMatch));
224 }
225
226 if let Some(outcome) = self
228 .resolve_via_embedding(&normalized, name, &surface_name, et, summary)
229 .await?
230 {
231 return Ok(outcome);
232 }
233
234 let entity_id = self
236 .store
237 .upsert_entity(&surface_name, &normalized, et, summary)
238 .await?;
239
240 self.register_aliases(entity_id, &normalized, name).await?;
241
242 Ok((entity_id, ResolutionOutcome::Created))
243 }
244
245 async fn embed_entity_text(
248 &self,
249 provider: &AnyProvider,
250 normalized: &str,
251 summary: Option<&str>,
252 ) -> Option<Vec<f32>> {
253 let safe_summary = truncate_to_bytes(summary.unwrap_or(""), MAX_FACT_BYTES);
254 let embed_text = format!("{normalized}: {safe_summary}");
255 let embed_result = tokio::time::timeout(
256 std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
257 provider.embed(&embed_text),
258 )
259 .await;
260 match embed_result {
261 Ok(Ok(v)) => Some(v),
262 Ok(Err(err)) => {
263 self.fallback_count
264 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
265 tracing::warn!(entity_name = %normalized, error = %err,
266 "embed() failed; falling back to exact-match-only entity creation");
267 None
268 }
269 Err(_timeout) => {
270 self.fallback_count
271 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
272 tracing::warn!(entity_name = %normalized,
273 "embed() timed out after {}s; falling back to create new entity",
274 EMBED_TIMEOUT_SECS);
275 None
276 }
277 }
278 }
279
280 #[allow(clippy::too_many_arguments)]
283 async fn handle_ambiguous_candidate(
284 &self,
285 emb_store: &EmbeddingStore,
286 provider: &AnyProvider,
287 payload: &std::collections::HashMap<String, serde_json::Value>,
288 score: f32,
289 surface_name: &str,
290 normalized: &str,
291 et: EntityType,
292 summary: Option<&str>,
293 ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
294 let entity_id = payload
295 .get("entity_id")
296 .and_then(serde_json::Value::as_i64)
297 .ok_or_else(|| MemoryError::GraphStore("missing entity_id in payload".into()))?;
298 let existing_name = payload
299 .get("name")
300 .and_then(|v| v.as_str())
301 .unwrap_or("")
302 .to_owned();
303 let existing_summary = payload
304 .get("summary")
305 .and_then(|v| v.as_str())
306 .unwrap_or("")
307 .to_owned();
308 let existing_type = payload
310 .get("entity_type")
311 .and_then(|v| v.as_str())
312 .unwrap_or(et.as_str())
313 .to_owned();
314 match self
315 .llm_disambiguate(
316 provider,
317 normalized,
318 et.as_str(),
319 summary.unwrap_or(""),
320 &existing_name,
321 &existing_type,
322 &existing_summary,
323 score,
324 )
325 .await
326 {
327 Some(true) => {
328 self.merge_entity(
329 emb_store,
330 provider,
331 entity_id,
332 surface_name,
333 normalized,
334 et,
335 summary,
336 )
337 .await?;
338 Ok(Some((entity_id, ResolutionOutcome::LlmDisambiguated)))
339 }
340 Some(false) => Ok(None),
341 None => {
342 self.fallback_count
343 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
344 tracing::warn!(entity_name = %normalized,
345 "LLM disambiguation failed; falling back to create new entity");
346 Ok(None)
347 }
348 }
349 }
350
351 async fn resolve_via_embedding(
354 &self,
355 normalized: &str,
356 original_name: &str,
357 surface_name: &str,
358 et: EntityType,
359 summary: Option<&str>,
360 ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
361 let (Some(emb_store), Some(provider)) = (self.embedding_store, self.provider) else {
362 return Ok(None);
363 };
364
365 let Some(query_vec) = self.embed_entity_text(provider, normalized, summary).await else {
366 return Ok(None);
367 };
368
369 let type_filter = VectorFilter {
370 must: vec![FieldCondition {
371 field: "entity_type".into(),
372 value: FieldValue::Text(et.as_str().to_owned()),
373 }],
374 must_not: vec![],
375 };
376 let candidates = match emb_store
377 .search_collection(ENTITY_COLLECTION, &query_vec, 5, Some(type_filter))
378 .await
379 {
380 Ok(c) => c,
381 Err(err) => {
382 self.fallback_count
383 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
384 tracing::warn!(entity_name = %normalized, error = %err,
385 "Qdrant search failed; falling back to create new entity");
386 return self
387 .create_with_embedding(
388 emb_store,
389 surface_name,
390 normalized,
391 original_name,
392 et,
393 summary,
394 &query_vec,
395 )
396 .await
397 .map(Some);
398 }
399 };
400
401 if let Some(best) = candidates.first() {
402 let score = best.score;
403 if score >= self.similarity_threshold {
404 let entity_id = best
405 .payload
406 .get("entity_id")
407 .and_then(serde_json::Value::as_i64)
408 .ok_or_else(|| {
409 MemoryError::GraphStore("missing entity_id in payload".into())
410 })?;
411 self.merge_entity(
412 emb_store,
413 provider,
414 entity_id,
415 surface_name,
416 normalized,
417 et,
418 summary,
419 )
420 .await?;
421 return Ok(Some((
422 entity_id,
423 ResolutionOutcome::EmbeddingMatch { score },
424 )));
425 } else if score >= self.ambiguous_threshold
426 && let Some(result) = self
427 .handle_ambiguous_candidate(
428 emb_store,
429 provider,
430 &best.payload,
431 score,
432 surface_name,
433 normalized,
434 et,
435 summary,
436 )
437 .await?
438 {
439 return Ok(Some(result));
440 }
441 }
443
444 self.create_with_embedding(
446 emb_store,
447 surface_name,
448 normalized,
449 original_name,
450 et,
451 summary,
452 &query_vec,
453 )
454 .await
455 .map(Some)
456 }
457
458 #[allow(clippy::too_many_arguments)]
460 async fn create_with_embedding(
461 &self,
462 emb_store: &EmbeddingStore,
463 surface_name: &str,
464 normalized: &str,
465 original_name: &str,
466 et: EntityType,
467 summary: Option<&str>,
468 query_vec: &[f32],
469 ) -> Result<(i64, ResolutionOutcome), MemoryError> {
470 let entity_id = self
471 .store
472 .upsert_entity(surface_name, normalized, et, summary)
473 .await?;
474 self.register_aliases(entity_id, normalized, original_name)
475 .await?;
476 self.store_entity_embedding(
477 emb_store,
478 entity_id,
479 None,
480 normalized,
481 et,
482 summary.unwrap_or(""),
483 query_vec,
484 )
485 .await;
486 Ok((entity_id, ResolutionOutcome::Created))
487 }
488
489 async fn register_aliases(
491 &self,
492 entity_id: i64,
493 normalized: &str,
494 original_name: &str,
495 ) -> Result<(), MemoryError> {
496 self.store.add_alias(entity_id, normalized).await?;
497
498 let original_trimmed = original_name.trim().to_lowercase();
501 let original_clean_str = strip_control_chars(&original_trimmed);
502 let original_clean = truncate_to_bytes(&original_clean_str, MAX_ENTITY_NAME_BYTES);
503 if original_clean != normalized {
504 self.store.add_alias(entity_id, original_clean).await?;
505 }
506
507 Ok(())
508 }
509
510 #[allow(clippy::too_many_arguments)]
512 async fn merge_entity(
513 &self,
514 emb_store: &EmbeddingStore,
515 provider: &AnyProvider,
516 entity_id: i64,
517 new_surface_name: &str,
518 new_canonical_name: &str,
519 entity_type: EntityType,
520 new_summary: Option<&str>,
521 ) -> Result<(), MemoryError> {
522 let existing = self.store.find_entity_by_id(entity_id).await?;
525 let existing_summary = existing
526 .as_ref()
527 .and_then(|e| e.summary.as_deref())
528 .unwrap_or("");
529
530 let merged_summary = if let Some(new) = new_summary {
531 if !new.is_empty() && !existing_summary.is_empty() {
532 let combined = format!("{existing_summary}; {new}");
533 truncate_to_bytes(&combined, MAX_FACT_BYTES).to_owned()
535 } else if !new.is_empty() {
536 new.to_owned()
537 } else {
538 existing_summary.to_owned()
539 }
540 } else {
541 existing_summary.to_owned()
542 };
543
544 let summary_opt = if merged_summary.is_empty() {
545 None
546 } else {
547 Some(merged_summary.as_str())
548 };
549
550 let existing_canonical = existing.as_ref().map_or_else(
552 || new_canonical_name.to_owned(),
553 |e| e.canonical_name.clone(),
554 );
555 let existing_name_owned = existing
556 .as_ref()
557 .map_or_else(|| new_surface_name.to_owned(), |e| e.name.clone());
558 self.store
559 .upsert_entity(
560 &existing_name_owned,
561 &existing_canonical,
562 entity_type,
563 summary_opt,
564 )
565 .await?;
566
567 let existing_point_id = existing
569 .as_ref()
570 .and_then(|e| e.qdrant_point_id.as_deref())
571 .map(ToOwned::to_owned);
572
573 let embed_text = format!("{existing_name_owned}: {merged_summary}");
575 let embed_result = tokio::time::timeout(
576 std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
577 provider.embed(&embed_text),
578 )
579 .await;
580
581 match embed_result {
582 Ok(Ok(vec)) => {
583 self.store_entity_embedding(
584 emb_store,
585 entity_id,
586 existing_point_id.as_deref(),
587 &existing_name_owned,
588 entity_type,
589 &merged_summary,
590 &vec,
591 )
592 .await;
593 }
594 Ok(Err(err)) => {
595 tracing::warn!(
596 entity_id,
597 error = %err,
598 "merge re-embed failed; Qdrant entry may be stale"
599 );
600 }
601 Err(_) => {
602 tracing::warn!(
603 entity_id,
604 "merge re-embed timed out; Qdrant entry may be stale"
605 );
606 }
607 }
608
609 Ok(())
610 }
611
612 #[allow(clippy::too_many_arguments)]
620 async fn store_entity_embedding(
621 &self,
622 emb_store: &EmbeddingStore,
623 entity_id: i64,
624 existing_point_id: Option<&str>,
625 name: &str,
626 entity_type: EntityType,
627 summary: &str,
628 vector: &[f32],
629 ) {
630 let vector_size = u64::try_from(vector.len()).unwrap_or(384);
634 if let Err(err) = emb_store
635 .ensure_named_collection(ENTITY_COLLECTION, vector_size)
636 .await
637 {
638 tracing::error!(
639 error = %err,
640 "failed to ensure entity embedding collection; skipping Qdrant upsert"
641 );
642 return;
643 }
644
645 let payload = serde_json::json!({
646 "entity_id": entity_id,
647 "name": name,
648 "entity_type": entity_type.as_str(),
649 "summary": summary,
650 });
651
652 if let Some(point_id) = existing_point_id {
653 if let Err(err) = emb_store
655 .upsert_to_collection(ENTITY_COLLECTION, point_id, payload, vector.to_vec())
656 .await
657 {
658 tracing::warn!(
659 entity_id,
660 error = %err,
661 "Qdrant upsert (existing point) failed; Qdrant entry may be stale"
662 );
663 }
664 } else {
665 match emb_store
666 .store_to_collection(ENTITY_COLLECTION, payload, vector.to_vec())
667 .await
668 {
669 Ok(point_id) => {
670 if let Err(err) = self
671 .store
672 .set_entity_qdrant_point_id(entity_id, &point_id)
673 .await
674 {
675 tracing::warn!(
676 entity_id,
677 error = %err,
678 "failed to store qdrant_point_id in SQLite"
679 );
680 }
681 }
682 Err(err) => {
683 tracing::warn!(
684 entity_id,
685 error = %err,
686 "Qdrant upsert failed; entity created in SQLite, qdrant_point_id remains NULL"
687 );
688 }
689 }
690 }
691 }
692
693 #[allow(clippy::too_many_arguments)]
697 async fn llm_disambiguate(
698 &self,
699 provider: &AnyProvider,
700 new_name: &str,
701 new_type: &str,
702 new_summary: &str,
703 existing_name: &str,
704 existing_type: &str,
705 existing_summary: &str,
706 score: f32,
707 ) -> Option<bool> {
708 let prompt = format!(
709 "New entity:\n\
710 - Name: <external-data>{new_name}</external-data>\n\
711 - Type: <external-data>{new_type}</external-data>\n\
712 - Summary: <external-data>{new_summary}</external-data>\n\
713 \n\
714 Existing entity:\n\
715 - Name: <external-data>{existing_name}</external-data>\n\
716 - Type: <external-data>{existing_type}</external-data>\n\
717 - Summary: <external-data>{existing_summary}</external-data>\n\
718 \n\
719 Cosine similarity: {score:.2}\n\
720 \n\
721 Are these the same entity? Respond with JSON: {{\"same_entity\": true}} or {{\"same_entity\": false}}"
722 );
723
724 let messages = [
725 Message::from_legacy(
726 Role::System,
727 "You are an entity disambiguation assistant. Given a new entity mention and \
728 an existing entity from the knowledge graph, determine if they refer to the same \
729 real-world entity. Respond only with JSON.",
730 ),
731 Message::from_legacy(Role::User, prompt),
732 ];
733
734 let response = match provider.chat(&messages).await {
735 Ok(r) => r,
736 Err(err) => {
737 tracing::warn!(error = %err, "LLM disambiguation chat failed");
738 return None;
739 }
740 };
741
742 let json_str = extract_json(&response);
744 match serde_json::from_str::<DisambiguationResponse>(json_str) {
745 Ok(parsed) => Some(parsed.same_entity),
746 Err(err) => {
747 tracing::warn!(error = %err, response = %response, "failed to parse LLM disambiguation response");
748 None
749 }
750 }
751 }
752
753 pub async fn resolve_batch(
766 &self,
767 entities: &[ExtractedEntity],
768 ) -> Result<Vec<(i64, ResolutionOutcome)>, MemoryError> {
769 if entities.is_empty() {
770 return Ok(Vec::new());
771 }
772
773 let mut results: Vec<Option<(i64, ResolutionOutcome)>> = vec![None; entities.len()];
775
776 let mut stream = stream::iter(entities.iter().enumerate().map(|(i, e)| {
777 let name = e.name.clone();
778 let entity_type = e.entity_type.clone();
779 let summary = e.summary.clone();
780 async move {
781 let result = self.resolve(&name, &entity_type, summary.as_deref()).await;
782 (i, result)
783 }
784 }))
785 .buffer_unordered(4);
786
787 while let Some((i, result)) = stream.next().await {
788 match result {
789 Ok(outcome) => results[i] = Some(outcome),
790 Err(err) => return Err(err),
791 }
792 }
793
794 Ok(results
795 .into_iter()
796 .enumerate()
797 .map(|(i, r)| {
798 r.unwrap_or_else(|| {
799 tracing::warn!(
800 index = i,
801 "resolve_batch: missing result at index — bug in stream collection"
802 );
803 panic!("resolve_batch: missing result at index {i}")
804 })
805 })
806 .collect())
807 }
808
809 pub async fn resolve_edge(
823 &self,
824 source_id: i64,
825 target_id: i64,
826 relation: &str,
827 fact: &str,
828 confidence: f32,
829 episode_id: Option<MessageId>,
830 ) -> Result<Option<i64>, MemoryError> {
831 let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
832 let normalized_relation = truncate_to_bytes(&relation_clean, MAX_RELATION_BYTES).to_owned();
833
834 let fact_clean = strip_control_chars(fact.trim());
835 let normalized_fact = truncate_to_bytes(&fact_clean, MAX_FACT_BYTES).to_owned();
836
837 let existing_edges = self.store.edges_exact(source_id, target_id).await?;
839
840 let matching = existing_edges
841 .iter()
842 .find(|e| e.relation == normalized_relation);
843
844 if let Some(old) = matching {
845 if old.fact == normalized_fact {
846 return Ok(None);
848 }
849 self.store.invalidate_edge(old.id).await?;
851 }
852
853 let new_id = self
854 .store
855 .insert_edge(
856 source_id,
857 target_id,
858 &normalized_relation,
859 &normalized_fact,
860 confidence,
861 episode_id,
862 )
863 .await?;
864 Ok(Some(new_id))
865 }
866}
867
868fn extract_json(s: &str) -> &str {
870 let trimmed = s.trim();
871 if let Some(inner) = trimmed.strip_prefix("```json")
873 && let Some(end) = inner.rfind("```")
874 {
875 return inner[..end].trim();
876 }
877 if let Some(inner) = trimmed.strip_prefix("```")
878 && let Some(end) = inner.rfind("```")
879 {
880 return inner[..end].trim();
881 }
882 if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
884 && start <= end
885 {
886 return &trimmed[start..=end];
887 }
888 trimmed
889}
890
891#[cfg(test)]
892mod tests {
893 use std::sync::Arc;
894
895 use super::*;
896 use crate::in_memory_store::InMemoryVectorStore;
897 use crate::sqlite::SqliteStore;
898
899 async fn setup() -> GraphStore {
900 let store = SqliteStore::new(":memory:").await.unwrap();
901 GraphStore::new(store.pool().clone())
902 }
903
904 async fn setup_with_embedding() -> (GraphStore, Arc<EmbeddingStore>) {
905 let sqlite = SqliteStore::new(":memory:").await.unwrap();
906 let pool = sqlite.pool().clone();
907 let mem_store = Box::new(InMemoryVectorStore::new());
908 let emb = Arc::new(EmbeddingStore::with_store(mem_store, pool));
909 let gs = GraphStore::new(sqlite.pool().clone());
910 (gs, emb)
911 }
912
913 fn make_mock_provider_with_embedding(embedding: Vec<f32>) -> zeph_llm::mock::MockProvider {
914 let mut p = zeph_llm::mock::MockProvider::default();
915 p.embedding = embedding;
916 p.supports_embeddings = true;
917 p
918 }
919
920 #[tokio::test]
923 async fn resolve_creates_new_entity() {
924 let gs = setup().await;
925 let resolver = EntityResolver::new(&gs);
926 let (id, outcome) = resolver
927 .resolve("alice", "person", Some("a person"))
928 .await
929 .unwrap();
930 assert!(id > 0);
931 assert_eq!(outcome, ResolutionOutcome::Created);
932 }
933
934 #[tokio::test]
935 async fn resolve_updates_existing_entity() {
936 let gs = setup().await;
937 let resolver = EntityResolver::new(&gs);
938 let (id1, _) = resolver.resolve("alice", "person", None).await.unwrap();
939 let (id2, outcome) = resolver
940 .resolve("alice", "person", Some("updated summary"))
941 .await
942 .unwrap();
943 assert_eq!(id1, id2);
944 assert_eq!(outcome, ResolutionOutcome::ExactMatch);
945
946 let entity = gs
947 .find_entity("alice", EntityType::Person)
948 .await
949 .unwrap()
950 .unwrap();
951 assert_eq!(entity.summary.as_deref(), Some("updated summary"));
952 }
953
954 #[tokio::test]
955 async fn resolve_unknown_type_falls_back_to_concept() {
956 let gs = setup().await;
957 let resolver = EntityResolver::new(&gs);
958 let (id, _) = resolver
959 .resolve("my_thing", "unknown_type", None)
960 .await
961 .unwrap();
962 assert!(id > 0);
963
964 let entity = gs
966 .find_entity("my_thing", EntityType::Concept)
967 .await
968 .unwrap();
969 assert!(entity.is_some());
970 }
971
972 #[tokio::test]
973 async fn resolve_empty_name_returns_error() {
974 let gs = setup().await;
975 let resolver = EntityResolver::new(&gs);
976
977 let result_empty = resolver.resolve("", "concept", None).await;
978 assert!(result_empty.is_err());
979 assert!(matches!(
980 result_empty.unwrap_err(),
981 MemoryError::GraphStore(_)
982 ));
983
984 let result_whitespace = resolver.resolve(" ", "concept", None).await;
985 assert!(result_whitespace.is_err());
986 }
987
988 #[tokio::test]
989 async fn resolve_case_insensitive() {
990 let gs = setup().await;
991 let resolver = EntityResolver::new(&gs);
992
993 let (id1, _) = resolver.resolve("Rust", "language", None).await.unwrap();
994 let (id2, outcome) = resolver.resolve("rust", "language", None).await.unwrap();
995 assert_eq!(
996 id1, id2,
997 "'Rust' and 'rust' should resolve to the same entity"
998 );
999 assert_eq!(outcome, ResolutionOutcome::ExactMatch);
1000 }
1001
1002 #[tokio::test]
1003 async fn resolve_edge_inserts_new() {
1004 let gs = setup().await;
1005 let resolver = EntityResolver::new(&gs);
1006
1007 let src = gs
1008 .upsert_entity("src", "src", EntityType::Concept, None)
1009 .await
1010 .unwrap();
1011 let tgt = gs
1012 .upsert_entity("tgt", "tgt", EntityType::Concept, None)
1013 .await
1014 .unwrap();
1015
1016 let result = resolver
1017 .resolve_edge(src, tgt, "uses", "src uses tgt", 0.9, None)
1018 .await
1019 .unwrap();
1020 assert!(result.is_some());
1021 assert!(result.unwrap() > 0);
1022 }
1023
1024 #[tokio::test]
1025 async fn resolve_edge_deduplicates_identical() {
1026 let gs = setup().await;
1027 let resolver = EntityResolver::new(&gs);
1028
1029 let src = gs
1030 .upsert_entity("a", "a", EntityType::Concept, None)
1031 .await
1032 .unwrap();
1033 let tgt = gs
1034 .upsert_entity("b", "b", EntityType::Concept, None)
1035 .await
1036 .unwrap();
1037
1038 let first = resolver
1039 .resolve_edge(src, tgt, "uses", "a uses b", 0.9, None)
1040 .await
1041 .unwrap();
1042 assert!(first.is_some());
1043
1044 let second = resolver
1045 .resolve_edge(src, tgt, "uses", "a uses b", 0.9, None)
1046 .await
1047 .unwrap();
1048 assert!(second.is_none(), "identical edge should be deduplicated");
1049 }
1050
1051 #[tokio::test]
1052 async fn resolve_edge_supersedes_contradictory() {
1053 let gs = setup().await;
1054 let resolver = EntityResolver::new(&gs);
1055
1056 let src = gs
1057 .upsert_entity("x", "x", EntityType::Concept, None)
1058 .await
1059 .unwrap();
1060 let tgt = gs
1061 .upsert_entity("y", "y", EntityType::Concept, None)
1062 .await
1063 .unwrap();
1064
1065 let first_id = resolver
1066 .resolve_edge(src, tgt, "prefers", "x prefers y (old)", 0.8, None)
1067 .await
1068 .unwrap()
1069 .unwrap();
1070
1071 let second_id = resolver
1072 .resolve_edge(src, tgt, "prefers", "x prefers y (new)", 0.9, None)
1073 .await
1074 .unwrap()
1075 .unwrap();
1076
1077 assert_ne!(first_id, second_id, "superseded edge should have a new ID");
1078
1079 let active_count = gs.active_edge_count().await.unwrap();
1081 assert_eq!(active_count, 1, "only new edge should be active");
1082 }
1083
1084 #[tokio::test]
1085 async fn resolve_edge_direction_sensitive() {
1086 let gs = setup().await;
1088 let resolver = EntityResolver::new(&gs);
1089
1090 let a = gs
1091 .upsert_entity("node_a", "node_a", EntityType::Concept, None)
1092 .await
1093 .unwrap();
1094 let b = gs
1095 .upsert_entity("node_b", "node_b", EntityType::Concept, None)
1096 .await
1097 .unwrap();
1098
1099 let id1 = resolver
1101 .resolve_edge(a, b, "uses", "A uses B", 0.9, None)
1102 .await
1103 .unwrap();
1104 assert!(id1.is_some());
1105
1106 let id2 = resolver
1108 .resolve_edge(b, a, "uses", "B uses A (different direction)", 0.9, None)
1109 .await
1110 .unwrap();
1111 assert!(id2.is_some());
1112
1113 let active_count = gs.active_edge_count().await.unwrap();
1115 assert_eq!(active_count, 2, "both directional edges should be active");
1116 }
1117
1118 #[tokio::test]
1119 async fn resolve_edge_normalizes_relation_case() {
1120 let gs = setup().await;
1121 let resolver = EntityResolver::new(&gs);
1122
1123 let src = gs
1124 .upsert_entity("p", "p", EntityType::Concept, None)
1125 .await
1126 .unwrap();
1127 let tgt = gs
1128 .upsert_entity("q", "q", EntityType::Concept, None)
1129 .await
1130 .unwrap();
1131
1132 let id1 = resolver
1134 .resolve_edge(src, tgt, "Uses", "p uses q", 0.9, None)
1135 .await
1136 .unwrap();
1137 assert!(id1.is_some());
1138
1139 let id2 = resolver
1141 .resolve_edge(src, tgt, "uses", "p uses q", 0.9, None)
1142 .await
1143 .unwrap();
1144 assert!(id2.is_none(), "normalized relations should deduplicate");
1145 }
1146
1147 #[tokio::test]
1150 async fn resolve_entity_type_uppercase_parsed_correctly() {
1151 let gs = setup().await;
1152 let resolver = EntityResolver::new(&gs);
1153
1154 let (id, _) = resolver
1156 .resolve("test_entity", "Person", None)
1157 .await
1158 .unwrap();
1159 assert!(id > 0);
1160
1161 let entity = gs
1162 .find_entity("test_entity", EntityType::Person)
1163 .await
1164 .unwrap();
1165 assert!(entity.is_some(), "entity should be stored as Person type");
1166 }
1167
1168 #[tokio::test]
1169 async fn resolve_entity_type_all_caps_parsed_correctly() {
1170 let gs = setup().await;
1171 let resolver = EntityResolver::new(&gs);
1172
1173 let (id, _) = resolver.resolve("my_lang", "LANGUAGE", None).await.unwrap();
1174 assert!(id > 0);
1175
1176 let entity = gs
1177 .find_entity("my_lang", EntityType::Language)
1178 .await
1179 .unwrap();
1180 assert!(entity.is_some(), "entity should be stored as Language type");
1181 }
1182
1183 #[tokio::test]
1186 async fn resolve_truncates_long_entity_name() {
1187 let gs = setup().await;
1188 let resolver = EntityResolver::new(&gs);
1189
1190 let long_name = "a".repeat(1024);
1191 let (id, _) = resolver.resolve(&long_name, "concept", None).await.unwrap();
1192 assert!(id > 0);
1193
1194 let entity = gs
1196 .find_entity(&"a".repeat(512), EntityType::Concept)
1197 .await
1198 .unwrap();
1199 assert!(entity.is_some(), "truncated name should be stored");
1200 }
1201
1202 #[tokio::test]
1205 async fn resolve_strips_control_chars_from_name() {
1206 let gs = setup().await;
1207 let resolver = EntityResolver::new(&gs);
1208
1209 let name_with_ctrl = "rust\x00lang";
1211 let (id, _) = resolver
1212 .resolve(name_with_ctrl, "language", None)
1213 .await
1214 .unwrap();
1215 assert!(id > 0);
1216
1217 let entity = gs
1219 .find_entity("rustlang", EntityType::Language)
1220 .await
1221 .unwrap();
1222 assert!(
1223 entity.is_some(),
1224 "control chars should be stripped from stored name"
1225 );
1226 }
1227
1228 #[tokio::test]
1229 async fn resolve_strips_bidi_overrides_from_name() {
1230 let gs = setup().await;
1231 let resolver = EntityResolver::new(&gs);
1232
1233 let name_with_bidi = "rust\u{202E}lang";
1235 let (id, _) = resolver
1236 .resolve(name_with_bidi, "language", None)
1237 .await
1238 .unwrap();
1239 assert!(id > 0);
1240
1241 let entity = gs
1242 .find_entity("rustlang", EntityType::Language)
1243 .await
1244 .unwrap();
1245 assert!(entity.is_some(), "BiDi override chars should be stripped");
1246 }
1247
1248 #[test]
1251 fn strip_control_chars_removes_ascii_controls() {
1252 assert_eq!(strip_control_chars("hello\x00world"), "helloworld");
1253 assert_eq!(strip_control_chars("tab\there"), "tabhere");
1254 assert_eq!(strip_control_chars("new\nline"), "newline");
1255 }
1256
1257 #[test]
1258 fn strip_control_chars_removes_bidi() {
1259 let bidi = "\u{202E}spoof";
1260 assert_eq!(strip_control_chars(bidi), "spoof");
1261 }
1262
1263 #[test]
1264 fn strip_control_chars_preserves_normal_unicode() {
1265 assert_eq!(strip_control_chars("привет мир"), "привет мир");
1266 assert_eq!(strip_control_chars("日本語"), "日本語");
1267 }
1268
1269 #[test]
1270 fn truncate_to_bytes_exact_boundary() {
1271 let s = "hello";
1272 assert_eq!(truncate_to_bytes(s, 5), "hello");
1273 assert_eq!(truncate_to_bytes(s, 3), "hel");
1274 }
1275
1276 #[test]
1277 fn truncate_to_bytes_respects_utf8_boundary() {
1278 let s = "élan";
1280 let truncated = truncate_to_bytes(s, 1);
1281 assert!(s.is_char_boundary(truncated.len()));
1282 }
1283
1284 #[tokio::test]
1287 async fn resolve_with_embedding_store_score_above_threshold_merges() {
1288 let (gs, emb) = setup_with_embedding().await;
1289 let existing_id = gs
1293 .upsert_entity(
1294 "python programming lang",
1295 "python programming lang",
1296 EntityType::Language,
1297 Some("a programming language"),
1298 )
1299 .await
1300 .unwrap();
1301
1302 let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1303 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1304 .await
1305 .unwrap();
1306 let payload = serde_json::json!({
1307 "entity_id": existing_id,
1308 "name": "python programming lang",
1309 "entity_type": "language",
1310 "summary": "a programming language",
1311 });
1312 let point_id = emb
1313 .store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1314 .await
1315 .unwrap();
1316 gs.set_entity_qdrant_point_id(existing_id, &point_id)
1317 .await
1318 .unwrap();
1319
1320 let provider = make_mock_provider_with_embedding(mock_vec);
1322 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1323
1324 let resolver = EntityResolver::new(&gs)
1325 .with_embedding_store(&emb)
1326 .with_provider(&any_provider)
1327 .with_thresholds(0.85, 0.70);
1328
1329 let (id, outcome) = resolver
1331 .resolve(
1332 "python scripting lang",
1333 "language",
1334 Some("scripting language"),
1335 )
1336 .await
1337 .unwrap();
1338
1339 assert_eq!(id, existing_id, "should return existing entity ID on merge");
1340 assert!(
1341 matches!(outcome, ResolutionOutcome::EmbeddingMatch { score } if score > 0.85),
1342 "outcome should be EmbeddingMatch with score > 0.85, got {outcome:?}"
1343 );
1344 }
1345
1346 #[tokio::test]
1347 async fn resolve_with_embedding_store_score_below_ambiguous_creates_new() {
1348 let (gs, emb) = setup_with_embedding().await;
1349 let existing_id = gs
1351 .upsert_entity("java", "java", EntityType::Language, Some("java language"))
1352 .await
1353 .unwrap();
1354
1355 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1357 .await
1358 .unwrap();
1359 let payload = serde_json::json!({
1360 "entity_id": existing_id,
1361 "name": "java",
1362 "entity_type": "language",
1363 "summary": "java language",
1364 });
1365 emb.store_to_collection(ENTITY_COLLECTION, payload, vec![1.0, 0.0, 0.0, 0.0])
1366 .await
1367 .unwrap();
1368
1369 let provider = make_mock_provider_with_embedding(vec![0.0, 1.0, 0.0, 0.0]);
1371 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1372
1373 let resolver = EntityResolver::new(&gs)
1374 .with_embedding_store(&emb)
1375 .with_provider(&any_provider)
1376 .with_thresholds(0.85, 0.70);
1377
1378 let (id, outcome) = resolver
1379 .resolve("kotlin", "language", Some("kotlin language"))
1380 .await
1381 .unwrap();
1382
1383 assert_ne!(id, existing_id, "orthogonal entity should create new");
1384 assert_eq!(outcome, ResolutionOutcome::Created);
1385 }
1386
1387 #[tokio::test]
1388 async fn resolve_with_embedding_failure_falls_back_to_create() {
1389 let sqlite2 = SqliteStore::new(":memory:").await.unwrap();
1392 let pool2 = sqlite2.pool().clone();
1393 let mem2 = Box::new(InMemoryVectorStore::new());
1394 let emb2 = Arc::new(EmbeddingStore::with_store(mem2, pool2));
1395 let gs2 = GraphStore::new(sqlite2.pool().clone());
1396
1397 let mut mock = zeph_llm::mock::MockProvider::default();
1398 mock.supports_embeddings = false;
1399 let any_provider = zeph_llm::any::AnyProvider::Mock(mock);
1400
1401 let resolver = EntityResolver::new(&gs2)
1402 .with_embedding_store(&emb2)
1403 .with_provider(&any_provider);
1404
1405 let (id, outcome) = resolver
1406 .resolve("testentity", "concept", Some("summary"))
1407 .await
1408 .unwrap();
1409 assert!(id > 0);
1410 assert_eq!(outcome, ResolutionOutcome::Created);
1411 }
1412
1413 #[tokio::test]
1414 async fn resolve_fallback_increments_counter() {
1415 let (gs, emb) = setup_with_embedding().await;
1416
1417 let mut mock = zeph_llm::mock::MockProvider::default();
1419 mock.supports_embeddings = false;
1420 let any_provider = zeph_llm::any::AnyProvider::Mock(mock);
1421
1422 let resolver = EntityResolver::new(&gs)
1423 .with_embedding_store(&emb)
1424 .with_provider(&any_provider);
1425
1426 let fallback_count = resolver.fallback_count();
1427
1428 resolver.resolve("entity_a", "concept", None).await.unwrap();
1430
1431 assert_eq!(
1432 fallback_count.load(std::sync::atomic::Ordering::Relaxed),
1433 1,
1434 "fallback counter should be 1 after embed failure"
1435 );
1436 }
1437
1438 #[tokio::test]
1439 async fn resolve_batch_processes_multiple_entities() {
1440 let gs = setup().await;
1441 let resolver = EntityResolver::new(&gs);
1442
1443 let entities = vec![
1444 ExtractedEntity {
1445 name: "rust".into(),
1446 entity_type: "language".into(),
1447 summary: Some("systems language".into()),
1448 },
1449 ExtractedEntity {
1450 name: "python".into(),
1451 entity_type: "language".into(),
1452 summary: None,
1453 },
1454 ExtractedEntity {
1455 name: "cargo".into(),
1456 entity_type: "tool".into(),
1457 summary: Some("rust build tool".into()),
1458 },
1459 ];
1460
1461 let results = resolver.resolve_batch(&entities).await.unwrap();
1462 assert_eq!(results.len(), 3);
1463 for (id, outcome) in &results {
1464 assert!(*id > 0);
1465 assert_eq!(*outcome, ResolutionOutcome::Created);
1466 }
1467 }
1468
1469 #[tokio::test]
1470 async fn resolve_batch_empty_returns_empty() {
1471 let gs = setup().await;
1472 let resolver = EntityResolver::new(&gs);
1473 let results = resolver.resolve_batch(&[]).await.unwrap();
1474 assert!(results.is_empty());
1475 }
1476
1477 #[tokio::test]
1478 async fn merge_combines_summaries() {
1479 let (gs, emb) = setup_with_embedding().await;
1480 let existing_id = gs
1484 .upsert_entity(
1485 "mergetest v1",
1486 "mergetest v1",
1487 EntityType::Concept,
1488 Some("first summary"),
1489 )
1490 .await
1491 .unwrap();
1492
1493 let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1494 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1495 .await
1496 .unwrap();
1497 let payload = serde_json::json!({
1498 "entity_id": existing_id,
1499 "name": "mergetest v1",
1500 "entity_type": "concept",
1501 "summary": "first summary",
1502 });
1503 let point_id = emb
1504 .store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1505 .await
1506 .unwrap();
1507 gs.set_entity_qdrant_point_id(existing_id, &point_id)
1508 .await
1509 .unwrap();
1510
1511 let provider = make_mock_provider_with_embedding(mock_vec);
1512 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1513
1514 let resolver = EntityResolver::new(&gs)
1515 .with_embedding_store(&emb)
1516 .with_provider(&any_provider)
1517 .with_thresholds(0.85, 0.70);
1518
1519 let (id, outcome) = resolver
1521 .resolve("mergetest v2", "concept", Some("second summary"))
1522 .await
1523 .unwrap();
1524
1525 assert_eq!(id, existing_id);
1526 assert!(matches!(outcome, ResolutionOutcome::EmbeddingMatch { .. }));
1527
1528 let entity = gs
1530 .find_entity("mergetest v1", EntityType::Concept)
1531 .await
1532 .unwrap()
1533 .unwrap();
1534 let summary = entity.summary.unwrap_or_default();
1535 assert!(
1536 summary.contains("first summary") && summary.contains("second summary"),
1537 "merged summary should contain both: got {summary:?}"
1538 );
1539 }
1540
1541 #[tokio::test]
1542 async fn merge_preserves_older_entity_id() {
1543 let (gs, emb) = setup_with_embedding().await;
1544 let existing_id = gs
1546 .upsert_entity(
1547 "legacy entity",
1548 "legacy entity",
1549 EntityType::Concept,
1550 Some("old info"),
1551 )
1552 .await
1553 .unwrap();
1554
1555 let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1556 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1557 .await
1558 .unwrap();
1559 let payload = serde_json::json!({
1560 "entity_id": existing_id,
1561 "name": "legacy entity",
1562 "entity_type": "concept",
1563 "summary": "old info",
1564 });
1565 emb.store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1566 .await
1567 .unwrap();
1568
1569 let provider = make_mock_provider_with_embedding(mock_vec);
1570 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1571
1572 let resolver = EntityResolver::new(&gs)
1573 .with_embedding_store(&emb)
1574 .with_provider(&any_provider)
1575 .with_thresholds(0.85, 0.70);
1576
1577 let (returned_id, _) = resolver
1578 .resolve("legacy entity variant", "concept", Some("new info"))
1579 .await
1580 .unwrap();
1581
1582 assert_eq!(
1583 returned_id, existing_id,
1584 "older entity ID should be preserved on merge"
1585 );
1586 }
1587
1588 #[tokio::test]
1589 async fn entity_type_filter_prevents_cross_type_merge() {
1590 let (gs, emb) = setup_with_embedding().await;
1591
1592 let person_id = gs
1594 .upsert_entity(
1595 "python",
1596 "python",
1597 EntityType::Person,
1598 Some("a person named python"),
1599 )
1600 .await
1601 .unwrap();
1602
1603 let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1604 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1605 .await
1606 .unwrap();
1607 let payload = serde_json::json!({
1608 "entity_id": person_id,
1609 "name": "python",
1610 "entity_type": "person",
1611 "summary": "a person named python",
1612 });
1613 emb.store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1614 .await
1615 .unwrap();
1616
1617 let provider = make_mock_provider_with_embedding(mock_vec);
1618 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1619
1620 let resolver = EntityResolver::new(&gs)
1621 .with_embedding_store(&emb)
1622 .with_provider(&any_provider)
1623 .with_thresholds(0.85, 0.70);
1624
1625 let (lang_id, outcome) = resolver
1627 .resolve("python", "language", Some("python language"))
1628 .await
1629 .unwrap();
1630
1631 assert_ne!(
1634 lang_id, person_id,
1635 "language entity should not merge with person entity"
1636 );
1637 assert_eq!(outcome, ResolutionOutcome::Created);
1640 }
1641
1642 #[tokio::test]
1643 async fn custom_thresholds_respected() {
1644 let (gs, emb) = setup_with_embedding().await;
1645 let existing_id = gs
1649 .upsert_entity(
1650 "threshold_test",
1651 "threshold_test",
1652 EntityType::Concept,
1653 Some("base"),
1654 )
1655 .await
1656 .unwrap();
1657
1658 let existing_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1659 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1660 .await
1661 .unwrap();
1662 let payload = serde_json::json!({
1663 "entity_id": existing_id,
1664 "name": "threshold_test",
1665 "entity_type": "concept",
1666 "summary": "base",
1667 });
1668 emb.store_to_collection(ENTITY_COLLECTION, payload, existing_vec)
1669 .await
1670 .unwrap();
1671
1672 let provider = make_mock_provider_with_embedding(vec![0.0, 1.0, 0.0, 0.0]);
1674 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1675
1676 let resolver = EntityResolver::new(&gs)
1678 .with_embedding_store(&emb)
1679 .with_provider(&any_provider)
1680 .with_thresholds(0.50, 0.30);
1681
1682 let (id, outcome) = resolver
1683 .resolve("new_concept", "concept", Some("different"))
1684 .await
1685 .unwrap();
1686
1687 assert_ne!(id, existing_id);
1688 assert_eq!(outcome, ResolutionOutcome::Created);
1689 }
1690
1691 #[tokio::test]
1692 async fn resolve_outcome_exact_match_no_embedding_store() {
1693 let gs = setup().await;
1694 let resolver = EntityResolver::new(&gs);
1695
1696 resolver.resolve("existing", "concept", None).await.unwrap();
1697 let (_, outcome) = resolver.resolve("existing", "concept", None).await.unwrap();
1698 assert_eq!(outcome, ResolutionOutcome::ExactMatch);
1699 }
1700
1701 #[tokio::test]
1702 async fn extract_json_strips_markdown_fences() {
1703 let with_fence = "```json\n{\"same_entity\": true}\n```";
1704 let extracted = extract_json(with_fence);
1705 let parsed: DisambiguationResponse = serde_json::from_str(extracted).unwrap();
1706 assert!(parsed.same_entity);
1707
1708 let without_fence = "{\"same_entity\": false}";
1709 let extracted2 = extract_json(without_fence);
1710 let parsed2: DisambiguationResponse = serde_json::from_str(extracted2).unwrap();
1711 assert!(!parsed2.same_entity);
1712 }
1713
1714 fn make_mock_with_embedding_and_chat(
1716 embedding: Vec<f32>,
1717 chat_responses: Vec<String>,
1718 ) -> zeph_llm::mock::MockProvider {
1719 let mut p = zeph_llm::mock::MockProvider::with_responses(chat_responses);
1720 p.embedding = embedding;
1721 p.supports_embeddings = true;
1722 p
1723 }
1724
1725 async fn seed_entity_with_vector(
1727 gs: &GraphStore,
1728 emb: &Arc<EmbeddingStore>,
1729 name: &str,
1730 entity_type: EntityType,
1731 summary: &str,
1732 vector: Vec<f32>,
1733 ) -> i64 {
1734 let id = gs
1735 .upsert_entity(name, name, entity_type, Some(summary))
1736 .await
1737 .unwrap();
1738 emb.ensure_named_collection(ENTITY_COLLECTION, u64::try_from(vector.len()).unwrap())
1739 .await
1740 .unwrap();
1741 let payload = serde_json::json!({
1742 "entity_id": id,
1743 "name": name,
1744 "entity_type": entity_type.as_str(),
1745 "summary": summary,
1746 });
1747 let point_id = emb
1748 .store_to_collection(ENTITY_COLLECTION, payload, vector)
1749 .await
1750 .unwrap();
1751 gs.set_entity_qdrant_point_id(id, &point_id).await.unwrap();
1752 id
1753 }
1754
1755 #[tokio::test]
1758 async fn resolve_ambiguous_score_llm_says_merge() {
1759 let (gs, emb) = setup_with_embedding().await;
1762 let existing_id = seed_entity_with_vector(
1763 &gs,
1764 &emb,
1765 "goroutine",
1766 EntityType::Concept,
1767 "go concurrency primitive",
1768 vec![1.0, 0.0, 0.0, 0.0],
1769 )
1770 .await;
1771
1772 let provider = make_mock_with_embedding_and_chat(
1774 vec![1.0, 1.0, 0.0, 0.0],
1775 vec![r#"{"same_entity": true}"#.to_owned()],
1776 );
1777 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1778
1779 let resolver = EntityResolver::new(&gs)
1780 .with_embedding_store(&emb)
1781 .with_provider(&any_provider)
1782 .with_thresholds(0.85, 0.50);
1783
1784 let (id, outcome) = resolver
1785 .resolve("goroutine concurrency", "concept", Some("go concurrency"))
1786 .await
1787 .unwrap();
1788
1789 assert_eq!(
1790 id, existing_id,
1791 "should return existing entity ID on LLM merge"
1792 );
1793 assert_eq!(outcome, ResolutionOutcome::LlmDisambiguated);
1794 }
1795
1796 #[tokio::test]
1799 async fn resolve_ambiguous_score_llm_says_different() {
1800 let (gs, emb) = setup_with_embedding().await;
1801 let existing_id = seed_entity_with_vector(
1802 &gs,
1803 &emb,
1804 "channel",
1805 EntityType::Concept,
1806 "go channel",
1807 vec![1.0, 0.0, 0.0, 0.0],
1808 )
1809 .await;
1810
1811 let provider = make_mock_with_embedding_and_chat(
1813 vec![1.0, 1.0, 0.0, 0.0],
1814 vec![r#"{"same_entity": false}"#.to_owned()],
1815 );
1816 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1817
1818 let resolver = EntityResolver::new(&gs)
1819 .with_embedding_store(&emb)
1820 .with_provider(&any_provider)
1821 .with_thresholds(0.85, 0.50);
1822
1823 let (id, outcome) = resolver
1824 .resolve("network channel", "concept", Some("networking channel"))
1825 .await
1826 .unwrap();
1827
1828 assert_ne!(
1829 id, existing_id,
1830 "LLM-rejected match should create new entity"
1831 );
1832 assert_eq!(outcome, ResolutionOutcome::Created);
1833 }
1834
1835 #[tokio::test]
1838 async fn resolve_ambiguous_score_llm_failure_increments_fallback() {
1839 let (gs, emb) = setup_with_embedding().await;
1840 let existing_id = seed_entity_with_vector(
1841 &gs,
1842 &emb,
1843 "mutex",
1844 EntityType::Concept,
1845 "mutual exclusion lock",
1846 vec![1.0, 0.0, 0.0, 0.0],
1847 )
1848 .await;
1849
1850 let mut provider = make_mock_with_embedding_and_chat(vec![1.0, 1.0, 0.0, 0.0], vec![]);
1852 provider.fail_chat = true;
1853 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1854
1855 let resolver = EntityResolver::new(&gs)
1856 .with_embedding_store(&emb)
1857 .with_provider(&any_provider)
1858 .with_thresholds(0.85, 0.50);
1859
1860 let fallback_count = resolver.fallback_count();
1861
1862 let (id, outcome) = resolver
1863 .resolve("mutex lock", "concept", Some("synchronization primitive"))
1864 .await
1865 .unwrap();
1866
1867 assert_ne!(
1869 id, existing_id,
1870 "LLM failure should create new entity (fallback)"
1871 );
1872 assert_eq!(outcome, ResolutionOutcome::Created);
1873 assert_eq!(
1874 fallback_count.load(std::sync::atomic::Ordering::Relaxed),
1875 1,
1876 "fallback counter should be incremented on LLM chat failure"
1877 );
1878 }
1879
1880 #[tokio::test]
1883 async fn resolve_creates_entity_with_canonical_name() {
1884 let gs = setup().await;
1885 let resolver = EntityResolver::new(&gs);
1886 let (id, _) = resolver.resolve("Rust", "language", None).await.unwrap();
1887 assert!(id > 0);
1888 let entity = gs
1889 .find_entity("rust", EntityType::Language)
1890 .await
1891 .unwrap()
1892 .unwrap();
1893 assert_eq!(entity.canonical_name, "rust");
1894 }
1895
1896 #[tokio::test]
1897 async fn resolve_adds_alias_on_create() {
1898 let gs = setup().await;
1899 let resolver = EntityResolver::new(&gs);
1900 let (id, _) = resolver.resolve("Rust", "language", None).await.unwrap();
1901 let aliases = gs.aliases_for_entity(id).await.unwrap();
1902 assert!(
1903 !aliases.is_empty(),
1904 "new entity should have at least one alias"
1905 );
1906 assert!(aliases.iter().any(|a| a.alias_name == "rust"));
1907 }
1908
1909 #[tokio::test]
1910 async fn resolve_reuses_entity_by_alias() {
1911 let gs = setup().await;
1912 let resolver = EntityResolver::new(&gs);
1913
1914 let (id1, _) = resolver.resolve("rust", "language", None).await.unwrap();
1916 gs.add_alias(id1, "rust-lang").await.unwrap();
1917
1918 let (id2, _) = resolver
1920 .resolve("rust-lang", "language", None)
1921 .await
1922 .unwrap();
1923 assert_eq!(
1924 id1, id2,
1925 "'rust-lang' alias should resolve to same entity as 'rust'"
1926 );
1927 }
1928
1929 #[tokio::test]
1930 async fn resolve_alias_match_respects_entity_type() {
1931 let gs = setup().await;
1932 let resolver = EntityResolver::new(&gs);
1933
1934 let (lang_id, _) = resolver.resolve("python", "language", None).await.unwrap();
1936
1937 let (tool_id, _) = resolver.resolve("python", "tool", None).await.unwrap();
1939 assert_ne!(
1940 lang_id, tool_id,
1941 "same name with different type should be separate entities"
1942 );
1943 }
1944
1945 #[tokio::test]
1946 async fn resolve_preserves_existing_aliases() {
1947 let gs = setup().await;
1948 let resolver = EntityResolver::new(&gs);
1949
1950 let (id, _) = resolver.resolve("rust", "language", None).await.unwrap();
1951 gs.add_alias(id, "rust-lang").await.unwrap();
1952
1953 resolver
1955 .resolve("rust", "language", Some("updated"))
1956 .await
1957 .unwrap();
1958 let aliases = gs.aliases_for_entity(id).await.unwrap();
1959 assert!(
1960 aliases.iter().any(|a| a.alias_name == "rust-lang"),
1961 "prior alias must be preserved"
1962 );
1963 }
1964
1965 #[tokio::test]
1966 async fn resolve_original_form_registered_as_alias() {
1967 let gs = setup().await;
1968 let resolver = EntityResolver::new(&gs);
1969
1970 let (id, _) = resolver
1973 .resolve(" Rust ", "language", None)
1974 .await
1975 .unwrap();
1976 let aliases = gs.aliases_for_entity(id).await.unwrap();
1977 assert!(aliases.iter().any(|a| a.alias_name == "rust"));
1978 }
1979
1980 #[tokio::test]
1981 async fn resolve_entity_with_many_aliases() {
1982 let gs = setup().await;
1983 let id = gs
1984 .upsert_entity("bigentity", "bigentity", EntityType::Concept, None)
1985 .await
1986 .unwrap();
1987 for i in 0..100 {
1988 gs.add_alias(id, &format!("alias-{i}")).await.unwrap();
1989 }
1990 let aliases = gs.aliases_for_entity(id).await.unwrap();
1991 assert_eq!(aliases.len(), 100);
1992
1993 let results = gs.find_entities_fuzzy("alias-50", 10).await.unwrap();
1995 assert!(results.iter().any(|e| e.id == id));
1996 }
1997}