1use std::sync::Arc;
5
6use dashmap::DashMap;
7use futures::stream::{self, StreamExt as _};
8use schemars::JsonSchema;
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use zeph_llm::any::AnyProvider;
12use zeph_llm::provider::{LlmProvider as _, Message, Role};
13
14use super::store::GraphStore;
15use super::types::EntityType;
16use crate::embedding_store::EmbeddingStore;
17use crate::error::MemoryError;
18use crate::graph::extractor::ExtractedEntity;
19use crate::types::MessageId;
20use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
21
22const MAX_ENTITY_NAME_BYTES: usize = 512;
24const MAX_RELATION_BYTES: usize = 256;
26const MAX_FACT_BYTES: usize = 2048;
28
29const ENTITY_COLLECTION: &str = "zeph_graph_entities";
31
32const EMBED_TIMEOUT_SECS: u64 = 30;
34
35fn strip_control_chars(s: &str) -> String {
37 s.chars()
38 .filter(|c| !c.is_control() && !matches!(*c as u32, 0x202A..=0x202E | 0x2066..=0x2069))
39 .collect()
40}
41
42fn truncate_to_bytes(s: &str, max_bytes: usize) -> &str {
44 if s.len() <= max_bytes {
45 return s;
46 }
47 let mut boundary = max_bytes;
48 while !s.is_char_boundary(boundary) {
49 boundary -= 1;
50 }
51 &s[..boundary]
52}
53
54#[derive(Debug, Clone, PartialEq)]
56pub enum ResolutionOutcome {
57 ExactMatch,
59 EmbeddingMatch { score: f32 },
61 LlmDisambiguated,
63 Created,
65}
66
67#[derive(Debug, Deserialize, JsonSchema)]
69struct DisambiguationResponse {
70 same_entity: bool,
71}
72
73type NameLockMap = Arc<DashMap<String, Arc<Mutex<()>>>>;
82
83pub struct EntityResolver<'a> {
84 store: &'a GraphStore,
85 embedding_store: Option<&'a Arc<EmbeddingStore>>,
86 provider: Option<&'a AnyProvider>,
87 similarity_threshold: f32,
88 ambiguous_threshold: f32,
89 name_locks: NameLockMap,
90 fallback_count: Arc<std::sync::atomic::AtomicU64>,
92}
93
94impl<'a> EntityResolver<'a> {
95 #[must_use]
96 pub fn new(store: &'a GraphStore) -> Self {
97 Self {
98 store,
99 embedding_store: None,
100 provider: None,
101 similarity_threshold: 0.85,
102 ambiguous_threshold: 0.70,
103 name_locks: Arc::new(DashMap::new()),
104 fallback_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
105 }
106 }
107
108 #[must_use]
109 pub fn with_embedding_store(mut self, store: &'a Arc<EmbeddingStore>) -> Self {
110 self.embedding_store = Some(store);
111 self
112 }
113
114 #[must_use]
115 pub fn with_provider(mut self, provider: &'a AnyProvider) -> Self {
116 self.provider = Some(provider);
117 self
118 }
119
120 #[must_use]
121 pub fn with_thresholds(mut self, similarity: f32, ambiguous: f32) -> Self {
122 self.similarity_threshold = similarity;
123 self.ambiguous_threshold = ambiguous;
124 self
125 }
126
127 #[must_use]
129 pub fn fallback_count(&self) -> Arc<std::sync::atomic::AtomicU64> {
130 Arc::clone(&self.fallback_count)
131 }
132
133 fn normalize_name(name: &str) -> String {
135 let lowered = name.trim().to_lowercase();
136 let cleaned = strip_control_chars(&lowered);
137 let normalized = truncate_to_bytes(&cleaned, MAX_ENTITY_NAME_BYTES).to_owned();
138 if normalized.len() < cleaned.len() {
139 tracing::debug!(
140 "graph resolver: entity name truncated to {} bytes",
141 MAX_ENTITY_NAME_BYTES
142 );
143 }
144 normalized
145 }
146
147 fn parse_entity_type(entity_type: &str) -> EntityType {
149 entity_type
150 .trim()
151 .to_lowercase()
152 .parse::<EntityType>()
153 .unwrap_or_else(|_| {
154 tracing::debug!(
155 "graph resolver: unknown entity type {:?}, falling back to Concept",
156 entity_type
157 );
158 EntityType::Concept
159 })
160 }
161
162 async fn lock_name(&self, normalized: &str) -> tokio::sync::OwnedMutexGuard<()> {
164 let lock = self
165 .name_locks
166 .entry(normalized.to_owned())
167 .or_insert_with(|| Arc::new(Mutex::new(())))
168 .clone();
169 lock.lock_owned().await
170 }
171
172 #[allow(clippy::too_many_lines)]
191 pub async fn resolve(
192 &self,
193 name: &str,
194 entity_type: &str,
195 summary: Option<&str>,
196 ) -> Result<(i64, ResolutionOutcome), MemoryError> {
197 let normalized = Self::normalize_name(name);
198
199 if normalized.is_empty() {
200 return Err(MemoryError::GraphStore("empty entity name".into()));
201 }
202
203 let et = Self::parse_entity_type(entity_type);
204
205 let surface_name = name.trim().to_owned();
207
208 let _guard = self.lock_name(&normalized).await;
210
211 if let Some(entity) = self.store.find_entity_by_alias(&normalized, et).await? {
213 self.store
214 .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
215 .await?;
216 return Ok((entity.id, ResolutionOutcome::ExactMatch));
217 }
218
219 if let Some(entity) = self.store.find_entity(&normalized, et).await? {
221 self.store
222 .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
223 .await?;
224 return Ok((entity.id, ResolutionOutcome::ExactMatch));
225 }
226
227 if let (Some(emb_store), Some(provider)) = (self.embedding_store, self.provider) {
229 let safe_summary = truncate_to_bytes(summary.unwrap_or(""), MAX_FACT_BYTES);
230 let embed_text = format!("{normalized}: {safe_summary}");
231
232 let embed_result = tokio::time::timeout(
233 std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
234 provider.embed(&embed_text),
235 )
236 .await;
237
238 match embed_result {
239 Ok(Ok(query_vec)) => {
240 let type_filter = VectorFilter {
241 must: vec![FieldCondition {
242 field: "entity_type".into(),
243 value: FieldValue::Text(et.as_str().to_owned()),
244 }],
245 must_not: vec![],
246 };
247 match emb_store
248 .search_collection(ENTITY_COLLECTION, &query_vec, 5, Some(type_filter))
249 .await
250 {
251 Ok(candidates) if !candidates.is_empty() => {
252 let best = &candidates[0];
253 let score = best.score;
254
255 if score >= self.similarity_threshold {
256 let entity_id = best
257 .payload
258 .get("entity_id")
259 .and_then(serde_json::Value::as_i64)
260 .ok_or_else(|| {
261 MemoryError::GraphStore(
262 "missing entity_id in payload".into(),
263 )
264 })?;
265
266 self.merge_entity(
267 emb_store,
268 provider,
269 entity_id,
270 &surface_name,
271 &normalized,
272 et,
273 summary,
274 )
275 .await?;
276
277 return Ok((
278 entity_id,
279 ResolutionOutcome::EmbeddingMatch { score },
280 ));
281 } else if score >= self.ambiguous_threshold {
282 let entity_id = best
283 .payload
284 .get("entity_id")
285 .and_then(serde_json::Value::as_i64)
286 .ok_or_else(|| {
287 MemoryError::GraphStore(
288 "missing entity_id in payload".into(),
289 )
290 })?;
291 let existing_name = best
292 .payload
293 .get("name")
294 .and_then(|v| v.as_str())
295 .unwrap_or("")
296 .to_owned();
297 let existing_summary = best
298 .payload
299 .get("summary")
300 .and_then(|v| v.as_str())
301 .unwrap_or("")
302 .to_owned();
303 let existing_type = best
305 .payload
306 .get("entity_type")
307 .and_then(|v| v.as_str())
308 .unwrap_or(et.as_str())
309 .to_owned();
310
311 match self
312 .llm_disambiguate(
313 provider,
314 &normalized,
315 et.as_str(),
316 summary.unwrap_or(""),
317 &existing_name,
318 &existing_type,
319 &existing_summary,
320 score,
321 )
322 .await
323 {
324 Some(true) => {
325 self.merge_entity(
326 emb_store,
327 provider,
328 entity_id,
329 &surface_name,
330 &normalized,
331 et,
332 summary,
333 )
334 .await?;
335 return Ok((
336 entity_id,
337 ResolutionOutcome::LlmDisambiguated,
338 ));
339 }
340 Some(false) => {
341 }
343 None => {
344 self.fallback_count
346 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
347 tracing::warn!(
348 entity_name = %normalized,
349 "LLM disambiguation failed; falling back to create new entity"
350 );
351 }
352 }
353 }
354 }
356 Ok(_) => {
357 }
359 Err(err) => {
360 self.fallback_count
361 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
362 tracing::warn!(
363 entity_name = %normalized,
364 error = %err,
365 "Qdrant search failed; falling back to create new entity"
366 );
367 }
368 }
369
370 let entity_id = self
372 .store
373 .upsert_entity(&surface_name, &normalized, et, summary)
374 .await?;
375
376 self.register_aliases(entity_id, &normalized, name).await?;
377
378 self.store_entity_embedding(
379 emb_store,
380 entity_id,
381 None,
382 &normalized,
383 et,
384 summary.unwrap_or(""),
385 &query_vec,
386 )
387 .await;
388
389 return Ok((entity_id, ResolutionOutcome::Created));
390 }
391 Ok(Err(err)) => {
392 self.fallback_count
393 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
394 tracing::warn!(
395 entity_name = %normalized,
396 error = %err,
397 "embed() failed; falling back to exact-match-only entity creation"
398 );
399 }
400 Err(_timeout) => {
401 self.fallback_count
402 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
403 tracing::warn!(
404 entity_name = %normalized,
405 "embed() timed out after {}s; falling back to create new entity",
406 EMBED_TIMEOUT_SECS
407 );
408 }
409 }
410 }
411
412 let entity_id = self
414 .store
415 .upsert_entity(&surface_name, &normalized, et, summary)
416 .await?;
417
418 self.register_aliases(entity_id, &normalized, name).await?;
419
420 Ok((entity_id, ResolutionOutcome::Created))
421 }
422
423 async fn register_aliases(
425 &self,
426 entity_id: i64,
427 normalized: &str,
428 original_name: &str,
429 ) -> Result<(), MemoryError> {
430 self.store.add_alias(entity_id, normalized).await?;
431
432 let original_trimmed = original_name.trim().to_lowercase();
435 let original_clean_str = strip_control_chars(&original_trimmed);
436 let original_clean = truncate_to_bytes(&original_clean_str, MAX_ENTITY_NAME_BYTES);
437 if original_clean != normalized {
438 self.store.add_alias(entity_id, original_clean).await?;
439 }
440
441 Ok(())
442 }
443
444 #[allow(clippy::too_many_arguments)]
446 async fn merge_entity(
447 &self,
448 emb_store: &EmbeddingStore,
449 provider: &AnyProvider,
450 entity_id: i64,
451 new_surface_name: &str,
452 new_canonical_name: &str,
453 entity_type: EntityType,
454 new_summary: Option<&str>,
455 ) -> Result<(), MemoryError> {
456 let existing = self.store.find_entity_by_id(entity_id).await?;
459 let existing_summary = existing
460 .as_ref()
461 .and_then(|e| e.summary.as_deref())
462 .unwrap_or("");
463
464 let merged_summary = if let Some(new) = new_summary {
465 if !new.is_empty() && !existing_summary.is_empty() {
466 let combined = format!("{existing_summary}; {new}");
467 truncate_to_bytes(&combined, MAX_FACT_BYTES).to_owned()
469 } else if !new.is_empty() {
470 new.to_owned()
471 } else {
472 existing_summary.to_owned()
473 }
474 } else {
475 existing_summary.to_owned()
476 };
477
478 let summary_opt = if merged_summary.is_empty() {
479 None
480 } else {
481 Some(merged_summary.as_str())
482 };
483
484 let existing_canonical = existing.as_ref().map_or_else(
486 || new_canonical_name.to_owned(),
487 |e| e.canonical_name.clone(),
488 );
489 let existing_name_owned = existing
490 .as_ref()
491 .map_or_else(|| new_surface_name.to_owned(), |e| e.name.clone());
492 self.store
493 .upsert_entity(
494 &existing_name_owned,
495 &existing_canonical,
496 entity_type,
497 summary_opt,
498 )
499 .await?;
500
501 let existing_point_id = existing
503 .as_ref()
504 .and_then(|e| e.qdrant_point_id.as_deref())
505 .map(ToOwned::to_owned);
506
507 let embed_text = format!("{existing_name_owned}: {merged_summary}");
509 let embed_result = tokio::time::timeout(
510 std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
511 provider.embed(&embed_text),
512 )
513 .await;
514
515 match embed_result {
516 Ok(Ok(vec)) => {
517 self.store_entity_embedding(
518 emb_store,
519 entity_id,
520 existing_point_id.as_deref(),
521 &existing_name_owned,
522 entity_type,
523 &merged_summary,
524 &vec,
525 )
526 .await;
527 }
528 Ok(Err(err)) => {
529 tracing::warn!(
530 entity_id,
531 error = %err,
532 "merge re-embed failed; Qdrant entry may be stale"
533 );
534 }
535 Err(_) => {
536 tracing::warn!(
537 entity_id,
538 "merge re-embed timed out; Qdrant entry may be stale"
539 );
540 }
541 }
542
543 Ok(())
544 }
545
546 #[allow(clippy::too_many_arguments)]
554 async fn store_entity_embedding(
555 &self,
556 emb_store: &EmbeddingStore,
557 entity_id: i64,
558 existing_point_id: Option<&str>,
559 name: &str,
560 entity_type: EntityType,
561 summary: &str,
562 vector: &[f32],
563 ) {
564 let vector_size = u64::try_from(vector.len()).unwrap_or(384);
568 if let Err(err) = emb_store
569 .ensure_named_collection(ENTITY_COLLECTION, vector_size)
570 .await
571 {
572 tracing::error!(
573 error = %err,
574 "failed to ensure entity embedding collection; skipping Qdrant upsert"
575 );
576 return;
577 }
578
579 let payload = serde_json::json!({
580 "entity_id": entity_id,
581 "name": name,
582 "entity_type": entity_type.as_str(),
583 "summary": summary,
584 });
585
586 if let Some(point_id) = existing_point_id {
587 if let Err(err) = emb_store
589 .upsert_to_collection(ENTITY_COLLECTION, point_id, payload, vector.to_vec())
590 .await
591 {
592 tracing::warn!(
593 entity_id,
594 error = %err,
595 "Qdrant upsert (existing point) failed; Qdrant entry may be stale"
596 );
597 }
598 } else {
599 match emb_store
600 .store_to_collection(ENTITY_COLLECTION, payload, vector.to_vec())
601 .await
602 {
603 Ok(point_id) => {
604 if let Err(err) = self
605 .store
606 .set_entity_qdrant_point_id(entity_id, &point_id)
607 .await
608 {
609 tracing::warn!(
610 entity_id,
611 error = %err,
612 "failed to store qdrant_point_id in SQLite"
613 );
614 }
615 }
616 Err(err) => {
617 tracing::warn!(
618 entity_id,
619 error = %err,
620 "Qdrant upsert failed; entity created in SQLite, qdrant_point_id remains NULL"
621 );
622 }
623 }
624 }
625 }
626
627 #[allow(clippy::too_many_arguments)]
631 async fn llm_disambiguate(
632 &self,
633 provider: &AnyProvider,
634 new_name: &str,
635 new_type: &str,
636 new_summary: &str,
637 existing_name: &str,
638 existing_type: &str,
639 existing_summary: &str,
640 score: f32,
641 ) -> Option<bool> {
642 let prompt = format!(
643 "New entity:\n\
644 - Name: <external-data>{new_name}</external-data>\n\
645 - Type: <external-data>{new_type}</external-data>\n\
646 - Summary: <external-data>{new_summary}</external-data>\n\
647 \n\
648 Existing entity:\n\
649 - Name: <external-data>{existing_name}</external-data>\n\
650 - Type: <external-data>{existing_type}</external-data>\n\
651 - Summary: <external-data>{existing_summary}</external-data>\n\
652 \n\
653 Cosine similarity: {score:.2}\n\
654 \n\
655 Are these the same entity? Respond with JSON: {{\"same_entity\": true}} or {{\"same_entity\": false}}"
656 );
657
658 let messages = [
659 Message::from_legacy(
660 Role::System,
661 "You are an entity disambiguation assistant. Given a new entity mention and \
662 an existing entity from the knowledge graph, determine if they refer to the same \
663 real-world entity. Respond only with JSON.",
664 ),
665 Message::from_legacy(Role::User, prompt),
666 ];
667
668 let response = match provider.chat(&messages).await {
669 Ok(r) => r,
670 Err(err) => {
671 tracing::warn!(error = %err, "LLM disambiguation chat failed");
672 return None;
673 }
674 };
675
676 let json_str = extract_json(&response);
678 match serde_json::from_str::<DisambiguationResponse>(json_str) {
679 Ok(parsed) => Some(parsed.same_entity),
680 Err(err) => {
681 tracing::warn!(error = %err, response = %response, "failed to parse LLM disambiguation response");
682 None
683 }
684 }
685 }
686
687 pub async fn resolve_batch(
700 &self,
701 entities: &[ExtractedEntity],
702 ) -> Result<Vec<(i64, ResolutionOutcome)>, MemoryError> {
703 if entities.is_empty() {
704 return Ok(Vec::new());
705 }
706
707 let mut results: Vec<Option<(i64, ResolutionOutcome)>> = vec![None; entities.len()];
709
710 let mut stream = stream::iter(entities.iter().enumerate().map(|(i, e)| {
711 let name = e.name.clone();
712 let entity_type = e.entity_type.clone();
713 let summary = e.summary.clone();
714 async move {
715 let result = self.resolve(&name, &entity_type, summary.as_deref()).await;
716 (i, result)
717 }
718 }))
719 .buffer_unordered(4);
720
721 while let Some((i, result)) = stream.next().await {
722 match result {
723 Ok(outcome) => results[i] = Some(outcome),
724 Err(err) => return Err(err),
725 }
726 }
727
728 Ok(results
729 .into_iter()
730 .enumerate()
731 .map(|(i, r)| {
732 r.unwrap_or_else(|| {
733 tracing::warn!(
734 index = i,
735 "resolve_batch: missing result at index — bug in stream collection"
736 );
737 panic!("resolve_batch: missing result at index {i}")
738 })
739 })
740 .collect())
741 }
742
743 pub async fn resolve_edge(
757 &self,
758 source_id: i64,
759 target_id: i64,
760 relation: &str,
761 fact: &str,
762 confidence: f32,
763 episode_id: Option<MessageId>,
764 ) -> Result<Option<i64>, MemoryError> {
765 let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
766 let normalized_relation = truncate_to_bytes(&relation_clean, MAX_RELATION_BYTES).to_owned();
767
768 let fact_clean = strip_control_chars(fact.trim());
769 let normalized_fact = truncate_to_bytes(&fact_clean, MAX_FACT_BYTES).to_owned();
770
771 let existing_edges = self.store.edges_exact(source_id, target_id).await?;
773
774 let matching = existing_edges
775 .iter()
776 .find(|e| e.relation == normalized_relation);
777
778 if let Some(old) = matching {
779 if old.fact == normalized_fact {
780 return Ok(None);
782 }
783 self.store.invalidate_edge(old.id).await?;
785 }
786
787 let new_id = self
788 .store
789 .insert_edge(
790 source_id,
791 target_id,
792 &normalized_relation,
793 &normalized_fact,
794 confidence,
795 episode_id,
796 )
797 .await?;
798 Ok(Some(new_id))
799 }
800}
801
802fn extract_json(s: &str) -> &str {
804 let trimmed = s.trim();
805 if let Some(inner) = trimmed.strip_prefix("```json")
807 && let Some(end) = inner.rfind("```")
808 {
809 return inner[..end].trim();
810 }
811 if let Some(inner) = trimmed.strip_prefix("```")
812 && let Some(end) = inner.rfind("```")
813 {
814 return inner[..end].trim();
815 }
816 if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
818 && start <= end
819 {
820 return &trimmed[start..=end];
821 }
822 trimmed
823}
824
825#[cfg(test)]
826mod tests {
827 use std::sync::Arc;
828
829 use super::*;
830 use crate::in_memory_store::InMemoryVectorStore;
831 use crate::sqlite::SqliteStore;
832
833 async fn setup() -> GraphStore {
834 let store = SqliteStore::new(":memory:").await.unwrap();
835 GraphStore::new(store.pool().clone())
836 }
837
838 async fn setup_with_embedding() -> (GraphStore, Arc<EmbeddingStore>) {
839 let sqlite = SqliteStore::new(":memory:").await.unwrap();
840 let pool = sqlite.pool().clone();
841 let mem_store = Box::new(InMemoryVectorStore::new());
842 let emb = Arc::new(EmbeddingStore::with_store(mem_store, pool));
843 let gs = GraphStore::new(sqlite.pool().clone());
844 (gs, emb)
845 }
846
847 fn make_mock_provider_with_embedding(embedding: Vec<f32>) -> zeph_llm::mock::MockProvider {
848 let mut p = zeph_llm::mock::MockProvider::default();
849 p.embedding = embedding;
850 p.supports_embeddings = true;
851 p
852 }
853
854 #[tokio::test]
857 async fn resolve_creates_new_entity() {
858 let gs = setup().await;
859 let resolver = EntityResolver::new(&gs);
860 let (id, outcome) = resolver
861 .resolve("alice", "person", Some("a person"))
862 .await
863 .unwrap();
864 assert!(id > 0);
865 assert_eq!(outcome, ResolutionOutcome::Created);
866 }
867
868 #[tokio::test]
869 async fn resolve_updates_existing_entity() {
870 let gs = setup().await;
871 let resolver = EntityResolver::new(&gs);
872 let (id1, _) = resolver.resolve("alice", "person", None).await.unwrap();
873 let (id2, outcome) = resolver
874 .resolve("alice", "person", Some("updated summary"))
875 .await
876 .unwrap();
877 assert_eq!(id1, id2);
878 assert_eq!(outcome, ResolutionOutcome::ExactMatch);
879
880 let entity = gs
881 .find_entity("alice", EntityType::Person)
882 .await
883 .unwrap()
884 .unwrap();
885 assert_eq!(entity.summary.as_deref(), Some("updated summary"));
886 }
887
888 #[tokio::test]
889 async fn resolve_unknown_type_falls_back_to_concept() {
890 let gs = setup().await;
891 let resolver = EntityResolver::new(&gs);
892 let (id, _) = resolver
893 .resolve("my_thing", "unknown_type", None)
894 .await
895 .unwrap();
896 assert!(id > 0);
897
898 let entity = gs
900 .find_entity("my_thing", EntityType::Concept)
901 .await
902 .unwrap();
903 assert!(entity.is_some());
904 }
905
906 #[tokio::test]
907 async fn resolve_empty_name_returns_error() {
908 let gs = setup().await;
909 let resolver = EntityResolver::new(&gs);
910
911 let result_empty = resolver.resolve("", "concept", None).await;
912 assert!(result_empty.is_err());
913 assert!(matches!(
914 result_empty.unwrap_err(),
915 MemoryError::GraphStore(_)
916 ));
917
918 let result_whitespace = resolver.resolve(" ", "concept", None).await;
919 assert!(result_whitespace.is_err());
920 }
921
922 #[tokio::test]
923 async fn resolve_case_insensitive() {
924 let gs = setup().await;
925 let resolver = EntityResolver::new(&gs);
926
927 let (id1, _) = resolver.resolve("Rust", "language", None).await.unwrap();
928 let (id2, outcome) = resolver.resolve("rust", "language", None).await.unwrap();
929 assert_eq!(
930 id1, id2,
931 "'Rust' and 'rust' should resolve to the same entity"
932 );
933 assert_eq!(outcome, ResolutionOutcome::ExactMatch);
934 }
935
936 #[tokio::test]
937 async fn resolve_edge_inserts_new() {
938 let gs = setup().await;
939 let resolver = EntityResolver::new(&gs);
940
941 let src = gs
942 .upsert_entity("src", "src", EntityType::Concept, None)
943 .await
944 .unwrap();
945 let tgt = gs
946 .upsert_entity("tgt", "tgt", EntityType::Concept, None)
947 .await
948 .unwrap();
949
950 let result = resolver
951 .resolve_edge(src, tgt, "uses", "src uses tgt", 0.9, None)
952 .await
953 .unwrap();
954 assert!(result.is_some());
955 assert!(result.unwrap() > 0);
956 }
957
958 #[tokio::test]
959 async fn resolve_edge_deduplicates_identical() {
960 let gs = setup().await;
961 let resolver = EntityResolver::new(&gs);
962
963 let src = gs
964 .upsert_entity("a", "a", EntityType::Concept, None)
965 .await
966 .unwrap();
967 let tgt = gs
968 .upsert_entity("b", "b", EntityType::Concept, None)
969 .await
970 .unwrap();
971
972 let first = resolver
973 .resolve_edge(src, tgt, "uses", "a uses b", 0.9, None)
974 .await
975 .unwrap();
976 assert!(first.is_some());
977
978 let second = resolver
979 .resolve_edge(src, tgt, "uses", "a uses b", 0.9, None)
980 .await
981 .unwrap();
982 assert!(second.is_none(), "identical edge should be deduplicated");
983 }
984
985 #[tokio::test]
986 async fn resolve_edge_supersedes_contradictory() {
987 let gs = setup().await;
988 let resolver = EntityResolver::new(&gs);
989
990 let src = gs
991 .upsert_entity("x", "x", EntityType::Concept, None)
992 .await
993 .unwrap();
994 let tgt = gs
995 .upsert_entity("y", "y", EntityType::Concept, None)
996 .await
997 .unwrap();
998
999 let first_id = resolver
1000 .resolve_edge(src, tgt, "prefers", "x prefers y (old)", 0.8, None)
1001 .await
1002 .unwrap()
1003 .unwrap();
1004
1005 let second_id = resolver
1006 .resolve_edge(src, tgt, "prefers", "x prefers y (new)", 0.9, None)
1007 .await
1008 .unwrap()
1009 .unwrap();
1010
1011 assert_ne!(first_id, second_id, "superseded edge should have a new ID");
1012
1013 let active_count = gs.active_edge_count().await.unwrap();
1015 assert_eq!(active_count, 1, "only new edge should be active");
1016 }
1017
1018 #[tokio::test]
1019 async fn resolve_edge_direction_sensitive() {
1020 let gs = setup().await;
1022 let resolver = EntityResolver::new(&gs);
1023
1024 let a = gs
1025 .upsert_entity("node_a", "node_a", EntityType::Concept, None)
1026 .await
1027 .unwrap();
1028 let b = gs
1029 .upsert_entity("node_b", "node_b", EntityType::Concept, None)
1030 .await
1031 .unwrap();
1032
1033 let id1 = resolver
1035 .resolve_edge(a, b, "uses", "A uses B", 0.9, None)
1036 .await
1037 .unwrap();
1038 assert!(id1.is_some());
1039
1040 let id2 = resolver
1042 .resolve_edge(b, a, "uses", "B uses A (different direction)", 0.9, None)
1043 .await
1044 .unwrap();
1045 assert!(id2.is_some());
1046
1047 let active_count = gs.active_edge_count().await.unwrap();
1049 assert_eq!(active_count, 2, "both directional edges should be active");
1050 }
1051
1052 #[tokio::test]
1053 async fn resolve_edge_normalizes_relation_case() {
1054 let gs = setup().await;
1055 let resolver = EntityResolver::new(&gs);
1056
1057 let src = gs
1058 .upsert_entity("p", "p", EntityType::Concept, None)
1059 .await
1060 .unwrap();
1061 let tgt = gs
1062 .upsert_entity("q", "q", EntityType::Concept, None)
1063 .await
1064 .unwrap();
1065
1066 let id1 = resolver
1068 .resolve_edge(src, tgt, "Uses", "p uses q", 0.9, None)
1069 .await
1070 .unwrap();
1071 assert!(id1.is_some());
1072
1073 let id2 = resolver
1075 .resolve_edge(src, tgt, "uses", "p uses q", 0.9, None)
1076 .await
1077 .unwrap();
1078 assert!(id2.is_none(), "normalized relations should deduplicate");
1079 }
1080
1081 #[tokio::test]
1084 async fn resolve_entity_type_uppercase_parsed_correctly() {
1085 let gs = setup().await;
1086 let resolver = EntityResolver::new(&gs);
1087
1088 let (id, _) = resolver
1090 .resolve("test_entity", "Person", None)
1091 .await
1092 .unwrap();
1093 assert!(id > 0);
1094
1095 let entity = gs
1096 .find_entity("test_entity", EntityType::Person)
1097 .await
1098 .unwrap();
1099 assert!(entity.is_some(), "entity should be stored as Person type");
1100 }
1101
1102 #[tokio::test]
1103 async fn resolve_entity_type_all_caps_parsed_correctly() {
1104 let gs = setup().await;
1105 let resolver = EntityResolver::new(&gs);
1106
1107 let (id, _) = resolver.resolve("my_lang", "LANGUAGE", None).await.unwrap();
1108 assert!(id > 0);
1109
1110 let entity = gs
1111 .find_entity("my_lang", EntityType::Language)
1112 .await
1113 .unwrap();
1114 assert!(entity.is_some(), "entity should be stored as Language type");
1115 }
1116
1117 #[tokio::test]
1120 async fn resolve_truncates_long_entity_name() {
1121 let gs = setup().await;
1122 let resolver = EntityResolver::new(&gs);
1123
1124 let long_name = "a".repeat(1024);
1125 let (id, _) = resolver.resolve(&long_name, "concept", None).await.unwrap();
1126 assert!(id > 0);
1127
1128 let entity = gs
1130 .find_entity(&"a".repeat(512), EntityType::Concept)
1131 .await
1132 .unwrap();
1133 assert!(entity.is_some(), "truncated name should be stored");
1134 }
1135
1136 #[tokio::test]
1139 async fn resolve_strips_control_chars_from_name() {
1140 let gs = setup().await;
1141 let resolver = EntityResolver::new(&gs);
1142
1143 let name_with_ctrl = "rust\x00lang";
1145 let (id, _) = resolver
1146 .resolve(name_with_ctrl, "language", None)
1147 .await
1148 .unwrap();
1149 assert!(id > 0);
1150
1151 let entity = gs
1153 .find_entity("rustlang", EntityType::Language)
1154 .await
1155 .unwrap();
1156 assert!(
1157 entity.is_some(),
1158 "control chars should be stripped from stored name"
1159 );
1160 }
1161
1162 #[tokio::test]
1163 async fn resolve_strips_bidi_overrides_from_name() {
1164 let gs = setup().await;
1165 let resolver = EntityResolver::new(&gs);
1166
1167 let name_with_bidi = "rust\u{202E}lang";
1169 let (id, _) = resolver
1170 .resolve(name_with_bidi, "language", None)
1171 .await
1172 .unwrap();
1173 assert!(id > 0);
1174
1175 let entity = gs
1176 .find_entity("rustlang", EntityType::Language)
1177 .await
1178 .unwrap();
1179 assert!(entity.is_some(), "BiDi override chars should be stripped");
1180 }
1181
1182 #[test]
1185 fn strip_control_chars_removes_ascii_controls() {
1186 assert_eq!(strip_control_chars("hello\x00world"), "helloworld");
1187 assert_eq!(strip_control_chars("tab\there"), "tabhere");
1188 assert_eq!(strip_control_chars("new\nline"), "newline");
1189 }
1190
1191 #[test]
1192 fn strip_control_chars_removes_bidi() {
1193 let bidi = "\u{202E}spoof";
1194 assert_eq!(strip_control_chars(bidi), "spoof");
1195 }
1196
1197 #[test]
1198 fn strip_control_chars_preserves_normal_unicode() {
1199 assert_eq!(strip_control_chars("привет мир"), "привет мир");
1200 assert_eq!(strip_control_chars("日本語"), "日本語");
1201 }
1202
1203 #[test]
1204 fn truncate_to_bytes_exact_boundary() {
1205 let s = "hello";
1206 assert_eq!(truncate_to_bytes(s, 5), "hello");
1207 assert_eq!(truncate_to_bytes(s, 3), "hel");
1208 }
1209
1210 #[test]
1211 fn truncate_to_bytes_respects_utf8_boundary() {
1212 let s = "élan";
1214 let truncated = truncate_to_bytes(s, 1);
1215 assert!(s.is_char_boundary(truncated.len()));
1216 }
1217
1218 #[tokio::test]
1221 async fn resolve_with_embedding_store_score_above_threshold_merges() {
1222 let (gs, emb) = setup_with_embedding().await;
1223 let existing_id = gs
1227 .upsert_entity(
1228 "python programming lang",
1229 "python programming lang",
1230 EntityType::Language,
1231 Some("a programming language"),
1232 )
1233 .await
1234 .unwrap();
1235
1236 let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1237 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1238 .await
1239 .unwrap();
1240 let payload = serde_json::json!({
1241 "entity_id": existing_id,
1242 "name": "python programming lang",
1243 "entity_type": "language",
1244 "summary": "a programming language",
1245 });
1246 let point_id = emb
1247 .store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1248 .await
1249 .unwrap();
1250 gs.set_entity_qdrant_point_id(existing_id, &point_id)
1251 .await
1252 .unwrap();
1253
1254 let provider = make_mock_provider_with_embedding(mock_vec);
1256 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1257
1258 let resolver = EntityResolver::new(&gs)
1259 .with_embedding_store(&emb)
1260 .with_provider(&any_provider)
1261 .with_thresholds(0.85, 0.70);
1262
1263 let (id, outcome) = resolver
1265 .resolve(
1266 "python scripting lang",
1267 "language",
1268 Some("scripting language"),
1269 )
1270 .await
1271 .unwrap();
1272
1273 assert_eq!(id, existing_id, "should return existing entity ID on merge");
1274 assert!(
1275 matches!(outcome, ResolutionOutcome::EmbeddingMatch { score } if score > 0.85),
1276 "outcome should be EmbeddingMatch with score > 0.85, got {outcome:?}"
1277 );
1278 }
1279
1280 #[tokio::test]
1281 async fn resolve_with_embedding_store_score_below_ambiguous_creates_new() {
1282 let (gs, emb) = setup_with_embedding().await;
1283 let existing_id = gs
1285 .upsert_entity("java", "java", EntityType::Language, Some("java language"))
1286 .await
1287 .unwrap();
1288
1289 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1291 .await
1292 .unwrap();
1293 let payload = serde_json::json!({
1294 "entity_id": existing_id,
1295 "name": "java",
1296 "entity_type": "language",
1297 "summary": "java language",
1298 });
1299 emb.store_to_collection(ENTITY_COLLECTION, payload, vec![1.0, 0.0, 0.0, 0.0])
1300 .await
1301 .unwrap();
1302
1303 let provider = make_mock_provider_with_embedding(vec![0.0, 1.0, 0.0, 0.0]);
1305 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1306
1307 let resolver = EntityResolver::new(&gs)
1308 .with_embedding_store(&emb)
1309 .with_provider(&any_provider)
1310 .with_thresholds(0.85, 0.70);
1311
1312 let (id, outcome) = resolver
1313 .resolve("kotlin", "language", Some("kotlin language"))
1314 .await
1315 .unwrap();
1316
1317 assert_ne!(id, existing_id, "orthogonal entity should create new");
1318 assert_eq!(outcome, ResolutionOutcome::Created);
1319 }
1320
1321 #[tokio::test]
1322 async fn resolve_with_embedding_failure_falls_back_to_create() {
1323 let sqlite2 = SqliteStore::new(":memory:").await.unwrap();
1326 let pool2 = sqlite2.pool().clone();
1327 let mem2 = Box::new(InMemoryVectorStore::new());
1328 let emb2 = Arc::new(EmbeddingStore::with_store(mem2, pool2));
1329 let gs2 = GraphStore::new(sqlite2.pool().clone());
1330
1331 let mut mock = zeph_llm::mock::MockProvider::default();
1332 mock.supports_embeddings = false;
1333 let any_provider = zeph_llm::any::AnyProvider::Mock(mock);
1334
1335 let resolver = EntityResolver::new(&gs2)
1336 .with_embedding_store(&emb2)
1337 .with_provider(&any_provider);
1338
1339 let (id, outcome) = resolver
1340 .resolve("testentity", "concept", Some("summary"))
1341 .await
1342 .unwrap();
1343 assert!(id > 0);
1344 assert_eq!(outcome, ResolutionOutcome::Created);
1345 }
1346
1347 #[tokio::test]
1348 async fn resolve_fallback_increments_counter() {
1349 let (gs, emb) = setup_with_embedding().await;
1350
1351 let mut mock = zeph_llm::mock::MockProvider::default();
1353 mock.supports_embeddings = false;
1354 let any_provider = zeph_llm::any::AnyProvider::Mock(mock);
1355
1356 let resolver = EntityResolver::new(&gs)
1357 .with_embedding_store(&emb)
1358 .with_provider(&any_provider);
1359
1360 let fallback_count = resolver.fallback_count();
1361
1362 resolver.resolve("entity_a", "concept", None).await.unwrap();
1364
1365 assert_eq!(
1366 fallback_count.load(std::sync::atomic::Ordering::Relaxed),
1367 1,
1368 "fallback counter should be 1 after embed failure"
1369 );
1370 }
1371
1372 #[tokio::test]
1373 async fn resolve_batch_processes_multiple_entities() {
1374 let gs = setup().await;
1375 let resolver = EntityResolver::new(&gs);
1376
1377 let entities = vec![
1378 ExtractedEntity {
1379 name: "rust".into(),
1380 entity_type: "language".into(),
1381 summary: Some("systems language".into()),
1382 },
1383 ExtractedEntity {
1384 name: "python".into(),
1385 entity_type: "language".into(),
1386 summary: None,
1387 },
1388 ExtractedEntity {
1389 name: "cargo".into(),
1390 entity_type: "tool".into(),
1391 summary: Some("rust build tool".into()),
1392 },
1393 ];
1394
1395 let results = resolver.resolve_batch(&entities).await.unwrap();
1396 assert_eq!(results.len(), 3);
1397 for (id, outcome) in &results {
1398 assert!(*id > 0);
1399 assert_eq!(*outcome, ResolutionOutcome::Created);
1400 }
1401 }
1402
1403 #[tokio::test]
1404 async fn resolve_batch_empty_returns_empty() {
1405 let gs = setup().await;
1406 let resolver = EntityResolver::new(&gs);
1407 let results = resolver.resolve_batch(&[]).await.unwrap();
1408 assert!(results.is_empty());
1409 }
1410
1411 #[tokio::test]
1412 async fn merge_combines_summaries() {
1413 let (gs, emb) = setup_with_embedding().await;
1414 let existing_id = gs
1418 .upsert_entity(
1419 "mergetest v1",
1420 "mergetest v1",
1421 EntityType::Concept,
1422 Some("first summary"),
1423 )
1424 .await
1425 .unwrap();
1426
1427 let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1428 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1429 .await
1430 .unwrap();
1431 let payload = serde_json::json!({
1432 "entity_id": existing_id,
1433 "name": "mergetest v1",
1434 "entity_type": "concept",
1435 "summary": "first summary",
1436 });
1437 let point_id = emb
1438 .store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1439 .await
1440 .unwrap();
1441 gs.set_entity_qdrant_point_id(existing_id, &point_id)
1442 .await
1443 .unwrap();
1444
1445 let provider = make_mock_provider_with_embedding(mock_vec);
1446 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1447
1448 let resolver = EntityResolver::new(&gs)
1449 .with_embedding_store(&emb)
1450 .with_provider(&any_provider)
1451 .with_thresholds(0.85, 0.70);
1452
1453 let (id, outcome) = resolver
1455 .resolve("mergetest v2", "concept", Some("second summary"))
1456 .await
1457 .unwrap();
1458
1459 assert_eq!(id, existing_id);
1460 assert!(matches!(outcome, ResolutionOutcome::EmbeddingMatch { .. }));
1461
1462 let entity = gs
1464 .find_entity("mergetest v1", EntityType::Concept)
1465 .await
1466 .unwrap()
1467 .unwrap();
1468 let summary = entity.summary.unwrap_or_default();
1469 assert!(
1470 summary.contains("first summary") && summary.contains("second summary"),
1471 "merged summary should contain both: got {summary:?}"
1472 );
1473 }
1474
1475 #[tokio::test]
1476 async fn merge_preserves_older_entity_id() {
1477 let (gs, emb) = setup_with_embedding().await;
1478 let existing_id = gs
1480 .upsert_entity(
1481 "legacy entity",
1482 "legacy entity",
1483 EntityType::Concept,
1484 Some("old info"),
1485 )
1486 .await
1487 .unwrap();
1488
1489 let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1490 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1491 .await
1492 .unwrap();
1493 let payload = serde_json::json!({
1494 "entity_id": existing_id,
1495 "name": "legacy entity",
1496 "entity_type": "concept",
1497 "summary": "old info",
1498 });
1499 emb.store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1500 .await
1501 .unwrap();
1502
1503 let provider = make_mock_provider_with_embedding(mock_vec);
1504 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1505
1506 let resolver = EntityResolver::new(&gs)
1507 .with_embedding_store(&emb)
1508 .with_provider(&any_provider)
1509 .with_thresholds(0.85, 0.70);
1510
1511 let (returned_id, _) = resolver
1512 .resolve("legacy entity variant", "concept", Some("new info"))
1513 .await
1514 .unwrap();
1515
1516 assert_eq!(
1517 returned_id, existing_id,
1518 "older entity ID should be preserved on merge"
1519 );
1520 }
1521
1522 #[tokio::test]
1523 async fn entity_type_filter_prevents_cross_type_merge() {
1524 let (gs, emb) = setup_with_embedding().await;
1525
1526 let person_id = gs
1528 .upsert_entity(
1529 "python",
1530 "python",
1531 EntityType::Person,
1532 Some("a person named python"),
1533 )
1534 .await
1535 .unwrap();
1536
1537 let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1538 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1539 .await
1540 .unwrap();
1541 let payload = serde_json::json!({
1542 "entity_id": person_id,
1543 "name": "python",
1544 "entity_type": "person",
1545 "summary": "a person named python",
1546 });
1547 emb.store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1548 .await
1549 .unwrap();
1550
1551 let provider = make_mock_provider_with_embedding(mock_vec);
1552 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1553
1554 let resolver = EntityResolver::new(&gs)
1555 .with_embedding_store(&emb)
1556 .with_provider(&any_provider)
1557 .with_thresholds(0.85, 0.70);
1558
1559 let (lang_id, outcome) = resolver
1561 .resolve("python", "language", Some("python language"))
1562 .await
1563 .unwrap();
1564
1565 assert_ne!(
1568 lang_id, person_id,
1569 "language entity should not merge with person entity"
1570 );
1571 assert_eq!(outcome, ResolutionOutcome::Created);
1574 }
1575
1576 #[tokio::test]
1577 async fn custom_thresholds_respected() {
1578 let (gs, emb) = setup_with_embedding().await;
1579 let existing_id = gs
1583 .upsert_entity(
1584 "threshold_test",
1585 "threshold_test",
1586 EntityType::Concept,
1587 Some("base"),
1588 )
1589 .await
1590 .unwrap();
1591
1592 let existing_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1593 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1594 .await
1595 .unwrap();
1596 let payload = serde_json::json!({
1597 "entity_id": existing_id,
1598 "name": "threshold_test",
1599 "entity_type": "concept",
1600 "summary": "base",
1601 });
1602 emb.store_to_collection(ENTITY_COLLECTION, payload, existing_vec)
1603 .await
1604 .unwrap();
1605
1606 let provider = make_mock_provider_with_embedding(vec![0.0, 1.0, 0.0, 0.0]);
1608 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1609
1610 let resolver = EntityResolver::new(&gs)
1612 .with_embedding_store(&emb)
1613 .with_provider(&any_provider)
1614 .with_thresholds(0.50, 0.30);
1615
1616 let (id, outcome) = resolver
1617 .resolve("new_concept", "concept", Some("different"))
1618 .await
1619 .unwrap();
1620
1621 assert_ne!(id, existing_id);
1622 assert_eq!(outcome, ResolutionOutcome::Created);
1623 }
1624
1625 #[tokio::test]
1626 async fn resolve_outcome_exact_match_no_embedding_store() {
1627 let gs = setup().await;
1628 let resolver = EntityResolver::new(&gs);
1629
1630 resolver.resolve("existing", "concept", None).await.unwrap();
1631 let (_, outcome) = resolver.resolve("existing", "concept", None).await.unwrap();
1632 assert_eq!(outcome, ResolutionOutcome::ExactMatch);
1633 }
1634
1635 #[tokio::test]
1636 async fn extract_json_strips_markdown_fences() {
1637 let with_fence = "```json\n{\"same_entity\": true}\n```";
1638 let extracted = extract_json(with_fence);
1639 let parsed: DisambiguationResponse = serde_json::from_str(extracted).unwrap();
1640 assert!(parsed.same_entity);
1641
1642 let without_fence = "{\"same_entity\": false}";
1643 let extracted2 = extract_json(without_fence);
1644 let parsed2: DisambiguationResponse = serde_json::from_str(extracted2).unwrap();
1645 assert!(!parsed2.same_entity);
1646 }
1647
1648 fn make_mock_with_embedding_and_chat(
1650 embedding: Vec<f32>,
1651 chat_responses: Vec<String>,
1652 ) -> zeph_llm::mock::MockProvider {
1653 let mut p = zeph_llm::mock::MockProvider::with_responses(chat_responses);
1654 p.embedding = embedding;
1655 p.supports_embeddings = true;
1656 p
1657 }
1658
1659 async fn seed_entity_with_vector(
1661 gs: &GraphStore,
1662 emb: &Arc<EmbeddingStore>,
1663 name: &str,
1664 entity_type: EntityType,
1665 summary: &str,
1666 vector: Vec<f32>,
1667 ) -> i64 {
1668 let id = gs
1669 .upsert_entity(name, name, entity_type, Some(summary))
1670 .await
1671 .unwrap();
1672 emb.ensure_named_collection(ENTITY_COLLECTION, u64::try_from(vector.len()).unwrap())
1673 .await
1674 .unwrap();
1675 let payload = serde_json::json!({
1676 "entity_id": id,
1677 "name": name,
1678 "entity_type": entity_type.as_str(),
1679 "summary": summary,
1680 });
1681 let point_id = emb
1682 .store_to_collection(ENTITY_COLLECTION, payload, vector)
1683 .await
1684 .unwrap();
1685 gs.set_entity_qdrant_point_id(id, &point_id).await.unwrap();
1686 id
1687 }
1688
1689 #[tokio::test]
1692 async fn resolve_ambiguous_score_llm_says_merge() {
1693 let (gs, emb) = setup_with_embedding().await;
1696 let existing_id = seed_entity_with_vector(
1697 &gs,
1698 &emb,
1699 "goroutine",
1700 EntityType::Concept,
1701 "go concurrency primitive",
1702 vec![1.0, 0.0, 0.0, 0.0],
1703 )
1704 .await;
1705
1706 let provider = make_mock_with_embedding_and_chat(
1708 vec![1.0, 1.0, 0.0, 0.0],
1709 vec![r#"{"same_entity": true}"#.to_owned()],
1710 );
1711 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1712
1713 let resolver = EntityResolver::new(&gs)
1714 .with_embedding_store(&emb)
1715 .with_provider(&any_provider)
1716 .with_thresholds(0.85, 0.50);
1717
1718 let (id, outcome) = resolver
1719 .resolve("goroutine concurrency", "concept", Some("go concurrency"))
1720 .await
1721 .unwrap();
1722
1723 assert_eq!(
1724 id, existing_id,
1725 "should return existing entity ID on LLM merge"
1726 );
1727 assert_eq!(outcome, ResolutionOutcome::LlmDisambiguated);
1728 }
1729
1730 #[tokio::test]
1733 async fn resolve_ambiguous_score_llm_says_different() {
1734 let (gs, emb) = setup_with_embedding().await;
1735 let existing_id = seed_entity_with_vector(
1736 &gs,
1737 &emb,
1738 "channel",
1739 EntityType::Concept,
1740 "go channel",
1741 vec![1.0, 0.0, 0.0, 0.0],
1742 )
1743 .await;
1744
1745 let provider = make_mock_with_embedding_and_chat(
1747 vec![1.0, 1.0, 0.0, 0.0],
1748 vec![r#"{"same_entity": false}"#.to_owned()],
1749 );
1750 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1751
1752 let resolver = EntityResolver::new(&gs)
1753 .with_embedding_store(&emb)
1754 .with_provider(&any_provider)
1755 .with_thresholds(0.85, 0.50);
1756
1757 let (id, outcome) = resolver
1758 .resolve("network channel", "concept", Some("networking channel"))
1759 .await
1760 .unwrap();
1761
1762 assert_ne!(
1763 id, existing_id,
1764 "LLM-rejected match should create new entity"
1765 );
1766 assert_eq!(outcome, ResolutionOutcome::Created);
1767 }
1768
1769 #[tokio::test]
1772 async fn resolve_ambiguous_score_llm_failure_increments_fallback() {
1773 let (gs, emb) = setup_with_embedding().await;
1774 let existing_id = seed_entity_with_vector(
1775 &gs,
1776 &emb,
1777 "mutex",
1778 EntityType::Concept,
1779 "mutual exclusion lock",
1780 vec![1.0, 0.0, 0.0, 0.0],
1781 )
1782 .await;
1783
1784 let mut provider = make_mock_with_embedding_and_chat(vec![1.0, 1.0, 0.0, 0.0], vec![]);
1786 provider.fail_chat = true;
1787 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1788
1789 let resolver = EntityResolver::new(&gs)
1790 .with_embedding_store(&emb)
1791 .with_provider(&any_provider)
1792 .with_thresholds(0.85, 0.50);
1793
1794 let fallback_count = resolver.fallback_count();
1795
1796 let (id, outcome) = resolver
1797 .resolve("mutex lock", "concept", Some("synchronization primitive"))
1798 .await
1799 .unwrap();
1800
1801 assert_ne!(
1803 id, existing_id,
1804 "LLM failure should create new entity (fallback)"
1805 );
1806 assert_eq!(outcome, ResolutionOutcome::Created);
1807 assert_eq!(
1808 fallback_count.load(std::sync::atomic::Ordering::Relaxed),
1809 1,
1810 "fallback counter should be incremented on LLM chat failure"
1811 );
1812 }
1813
1814 #[tokio::test]
1817 async fn resolve_creates_entity_with_canonical_name() {
1818 let gs = setup().await;
1819 let resolver = EntityResolver::new(&gs);
1820 let (id, _) = resolver.resolve("Rust", "language", None).await.unwrap();
1821 assert!(id > 0);
1822 let entity = gs
1823 .find_entity("rust", EntityType::Language)
1824 .await
1825 .unwrap()
1826 .unwrap();
1827 assert_eq!(entity.canonical_name, "rust");
1828 }
1829
1830 #[tokio::test]
1831 async fn resolve_adds_alias_on_create() {
1832 let gs = setup().await;
1833 let resolver = EntityResolver::new(&gs);
1834 let (id, _) = resolver.resolve("Rust", "language", None).await.unwrap();
1835 let aliases = gs.aliases_for_entity(id).await.unwrap();
1836 assert!(
1837 !aliases.is_empty(),
1838 "new entity should have at least one alias"
1839 );
1840 assert!(aliases.iter().any(|a| a.alias_name == "rust"));
1841 }
1842
1843 #[tokio::test]
1844 async fn resolve_reuses_entity_by_alias() {
1845 let gs = setup().await;
1846 let resolver = EntityResolver::new(&gs);
1847
1848 let (id1, _) = resolver.resolve("rust", "language", None).await.unwrap();
1850 gs.add_alias(id1, "rust-lang").await.unwrap();
1851
1852 let (id2, _) = resolver
1854 .resolve("rust-lang", "language", None)
1855 .await
1856 .unwrap();
1857 assert_eq!(
1858 id1, id2,
1859 "'rust-lang' alias should resolve to same entity as 'rust'"
1860 );
1861 }
1862
1863 #[tokio::test]
1864 async fn resolve_alias_match_respects_entity_type() {
1865 let gs = setup().await;
1866 let resolver = EntityResolver::new(&gs);
1867
1868 let (lang_id, _) = resolver.resolve("python", "language", None).await.unwrap();
1870
1871 let (tool_id, _) = resolver.resolve("python", "tool", None).await.unwrap();
1873 assert_ne!(
1874 lang_id, tool_id,
1875 "same name with different type should be separate entities"
1876 );
1877 }
1878
1879 #[tokio::test]
1880 async fn resolve_preserves_existing_aliases() {
1881 let gs = setup().await;
1882 let resolver = EntityResolver::new(&gs);
1883
1884 let (id, _) = resolver.resolve("rust", "language", None).await.unwrap();
1885 gs.add_alias(id, "rust-lang").await.unwrap();
1886
1887 resolver
1889 .resolve("rust", "language", Some("updated"))
1890 .await
1891 .unwrap();
1892 let aliases = gs.aliases_for_entity(id).await.unwrap();
1893 assert!(
1894 aliases.iter().any(|a| a.alias_name == "rust-lang"),
1895 "prior alias must be preserved"
1896 );
1897 }
1898
1899 #[tokio::test]
1900 async fn resolve_original_form_registered_as_alias() {
1901 let gs = setup().await;
1902 let resolver = EntityResolver::new(&gs);
1903
1904 let (id, _) = resolver
1907 .resolve(" Rust ", "language", None)
1908 .await
1909 .unwrap();
1910 let aliases = gs.aliases_for_entity(id).await.unwrap();
1911 assert!(aliases.iter().any(|a| a.alias_name == "rust"));
1912 }
1913
1914 #[tokio::test]
1915 async fn resolve_entity_with_many_aliases() {
1916 let gs = setup().await;
1917 let id = gs
1918 .upsert_entity("bigentity", "bigentity", EntityType::Concept, None)
1919 .await
1920 .unwrap();
1921 for i in 0..100 {
1922 gs.add_alias(id, &format!("alias-{i}")).await.unwrap();
1923 }
1924 let aliases = gs.aliases_for_entity(id).await.unwrap();
1925 assert_eq!(aliases.len(), 100);
1926
1927 let results = gs.find_entities_fuzzy("alias-50", 10).await.unwrap();
1929 assert!(results.iter().any(|e| e.id == id));
1930 }
1931}