1use std::sync::Arc;
5
6use dashmap::DashMap;
7use futures::stream::{self, StreamExt as _};
8use schemars::JsonSchema;
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use zeph_llm::any::AnyProvider;
12use zeph_llm::provider::{LlmProvider as _, Message, Role};
13
14use super::store::GraphStore;
15use super::types::EntityType;
16use crate::embedding_store::EmbeddingStore;
17use crate::error::MemoryError;
18use crate::graph::extractor::ExtractedEntity;
19use crate::types::MessageId;
20use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
21
22const MIN_ENTITY_NAME_BYTES: usize = 3;
24const MAX_ENTITY_NAME_BYTES: usize = 512;
26const MAX_RELATION_BYTES: usize = 256;
28const MAX_FACT_BYTES: usize = 2048;
30
31const ENTITY_COLLECTION: &str = "zeph_graph_entities";
33
34const EMBED_TIMEOUT_SECS: u64 = 30;
36
37fn strip_control_chars(s: &str) -> String {
39 s.chars()
40 .filter(|c| !c.is_control() && !matches!(*c as u32, 0x202A..=0x202E | 0x2066..=0x2069))
41 .collect()
42}
43
44fn truncate_to_bytes(s: &str, max_bytes: usize) -> &str {
46 if s.len() <= max_bytes {
47 return s;
48 }
49 let mut boundary = max_bytes;
50 while !s.is_char_boundary(boundary) {
51 boundary -= 1;
52 }
53 &s[..boundary]
54}
55
56#[derive(Debug, Clone, PartialEq)]
58pub enum ResolutionOutcome {
59 ExactMatch,
61 EmbeddingMatch { score: f32 },
63 LlmDisambiguated,
65 Created,
67}
68
69#[derive(Debug, Deserialize, JsonSchema)]
71struct DisambiguationResponse {
72 same_entity: bool,
73}
74
75type NameLockMap = Arc<DashMap<String, Arc<Mutex<()>>>>;
84
85pub struct EntityResolver<'a> {
86 store: &'a GraphStore,
87 embedding_store: Option<&'a Arc<EmbeddingStore>>,
88 provider: Option<&'a AnyProvider>,
89 similarity_threshold: f32,
90 ambiguous_threshold: f32,
91 name_locks: NameLockMap,
92 fallback_count: Arc<std::sync::atomic::AtomicU64>,
94}
95
96impl<'a> EntityResolver<'a> {
97 #[must_use]
98 pub fn new(store: &'a GraphStore) -> Self {
99 Self {
100 store,
101 embedding_store: None,
102 provider: None,
103 similarity_threshold: 0.85,
104 ambiguous_threshold: 0.70,
105 name_locks: Arc::new(DashMap::new()),
106 fallback_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
107 }
108 }
109
110 #[must_use]
111 pub fn with_embedding_store(mut self, store: &'a Arc<EmbeddingStore>) -> Self {
112 self.embedding_store = Some(store);
113 self
114 }
115
116 #[must_use]
117 pub fn with_provider(mut self, provider: &'a AnyProvider) -> Self {
118 self.provider = Some(provider);
119 self
120 }
121
122 #[must_use]
123 pub fn with_thresholds(mut self, similarity: f32, ambiguous: f32) -> Self {
124 self.similarity_threshold = similarity;
125 self.ambiguous_threshold = ambiguous;
126 self
127 }
128
129 #[must_use]
131 pub fn fallback_count(&self) -> Arc<std::sync::atomic::AtomicU64> {
132 Arc::clone(&self.fallback_count)
133 }
134
135 fn normalize_name(name: &str) -> String {
137 let lowered = name.trim().to_lowercase();
138 let cleaned = strip_control_chars(&lowered);
139 let normalized = truncate_to_bytes(&cleaned, MAX_ENTITY_NAME_BYTES).to_owned();
140 if normalized.len() < cleaned.len() {
141 tracing::debug!(
142 "graph resolver: entity name truncated to {} bytes",
143 MAX_ENTITY_NAME_BYTES
144 );
145 }
146 normalized
147 }
148
149 fn parse_entity_type(entity_type: &str) -> EntityType {
151 entity_type
152 .trim()
153 .to_lowercase()
154 .parse::<EntityType>()
155 .unwrap_or_else(|_| {
156 tracing::debug!(
157 "graph resolver: unknown entity type {:?}, falling back to Concept",
158 entity_type
159 );
160 EntityType::Concept
161 })
162 }
163
164 async fn lock_name(&self, normalized: &str) -> tokio::sync::OwnedMutexGuard<()> {
166 let lock = self
167 .name_locks
168 .entry(normalized.to_owned())
169 .or_insert_with(|| Arc::new(Mutex::new(())))
170 .clone();
171 lock.lock_owned().await
172 }
173
174 pub async fn resolve(
193 &self,
194 name: &str,
195 entity_type: &str,
196 summary: Option<&str>,
197 ) -> Result<(i64, ResolutionOutcome), MemoryError> {
198 let normalized = Self::normalize_name(name);
199
200 if normalized.is_empty() {
201 return Err(MemoryError::GraphStore("empty entity name".into()));
202 }
203
204 if normalized.len() < MIN_ENTITY_NAME_BYTES {
205 return Err(MemoryError::GraphStore(format!(
206 "entity name too short: {normalized:?} ({} bytes, min {MIN_ENTITY_NAME_BYTES})",
207 normalized.len()
208 )));
209 }
210
211 let et = Self::parse_entity_type(entity_type);
212
213 let surface_name = name.trim().to_owned();
215
216 let _guard = self.lock_name(&normalized).await;
218
219 if let Some(entity) = self.store.find_entity_by_alias(&normalized, et).await? {
221 self.store
222 .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
223 .await?;
224 return Ok((entity.id, ResolutionOutcome::ExactMatch));
225 }
226
227 if let Some(entity) = self.store.find_entity(&normalized, et).await? {
229 self.store
230 .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
231 .await?;
232 return Ok((entity.id, ResolutionOutcome::ExactMatch));
233 }
234
235 if let Some(outcome) = self
237 .resolve_via_embedding(&normalized, name, &surface_name, et, summary)
238 .await?
239 {
240 return Ok(outcome);
241 }
242
243 let entity_id = self
245 .store
246 .upsert_entity(&surface_name, &normalized, et, summary)
247 .await?;
248
249 self.register_aliases(entity_id, &normalized, name).await?;
250
251 Ok((entity_id, ResolutionOutcome::Created))
252 }
253
254 async fn embed_entity_text(
257 &self,
258 provider: &AnyProvider,
259 normalized: &str,
260 summary: Option<&str>,
261 ) -> Option<Vec<f32>> {
262 let safe_summary = truncate_to_bytes(summary.unwrap_or(""), MAX_FACT_BYTES);
263 let embed_text = format!("{normalized}: {safe_summary}");
264 let embed_result = tokio::time::timeout(
265 std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
266 provider.embed(&embed_text),
267 )
268 .await;
269 match embed_result {
270 Ok(Ok(v)) => Some(v),
271 Ok(Err(err)) => {
272 self.fallback_count
273 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
274 tracing::warn!(entity_name = %normalized, error = %err,
275 "embed() failed; falling back to exact-match-only entity creation");
276 None
277 }
278 Err(_timeout) => {
279 self.fallback_count
280 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
281 tracing::warn!(entity_name = %normalized,
282 "embed() timed out after {}s; falling back to create new entity",
283 EMBED_TIMEOUT_SECS);
284 None
285 }
286 }
287 }
288
289 #[allow(clippy::too_many_arguments)]
292 async fn handle_ambiguous_candidate(
293 &self,
294 emb_store: &EmbeddingStore,
295 provider: &AnyProvider,
296 payload: &std::collections::HashMap<String, serde_json::Value>,
297 score: f32,
298 surface_name: &str,
299 normalized: &str,
300 et: EntityType,
301 summary: Option<&str>,
302 ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
303 let entity_id = payload
304 .get("entity_id")
305 .and_then(serde_json::Value::as_i64)
306 .ok_or_else(|| MemoryError::GraphStore("missing entity_id in payload".into()))?;
307 let existing_name = payload
308 .get("name")
309 .and_then(|v| v.as_str())
310 .unwrap_or("")
311 .to_owned();
312 let existing_summary = payload
313 .get("summary")
314 .and_then(|v| v.as_str())
315 .unwrap_or("")
316 .to_owned();
317 let existing_type = payload
319 .get("entity_type")
320 .and_then(|v| v.as_str())
321 .unwrap_or(et.as_str())
322 .to_owned();
323 match self
324 .llm_disambiguate(
325 provider,
326 normalized,
327 et.as_str(),
328 summary.unwrap_or(""),
329 &existing_name,
330 &existing_type,
331 &existing_summary,
332 score,
333 )
334 .await
335 {
336 Some(true) => {
337 self.merge_entity(
338 emb_store,
339 provider,
340 entity_id,
341 surface_name,
342 normalized,
343 et,
344 summary,
345 )
346 .await?;
347 Ok(Some((entity_id, ResolutionOutcome::LlmDisambiguated)))
348 }
349 Some(false) => Ok(None),
350 None => {
351 self.fallback_count
352 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
353 tracing::warn!(entity_name = %normalized,
354 "LLM disambiguation failed; falling back to create new entity");
355 Ok(None)
356 }
357 }
358 }
359
360 async fn resolve_via_embedding(
363 &self,
364 normalized: &str,
365 original_name: &str,
366 surface_name: &str,
367 et: EntityType,
368 summary: Option<&str>,
369 ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
370 let (Some(emb_store), Some(provider)) = (self.embedding_store, self.provider) else {
371 return Ok(None);
372 };
373
374 let Some(query_vec) = self.embed_entity_text(provider, normalized, summary).await else {
375 return Ok(None);
376 };
377
378 let type_filter = VectorFilter {
379 must: vec![FieldCondition {
380 field: "entity_type".into(),
381 value: FieldValue::Text(et.as_str().to_owned()),
382 }],
383 must_not: vec![],
384 };
385 let candidates = match emb_store
386 .search_collection(ENTITY_COLLECTION, &query_vec, 5, Some(type_filter))
387 .await
388 {
389 Ok(c) => c,
390 Err(err) => {
391 self.fallback_count
392 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
393 tracing::warn!(entity_name = %normalized, error = %err,
394 "Qdrant search failed; falling back to create new entity");
395 return self
396 .create_with_embedding(
397 emb_store,
398 surface_name,
399 normalized,
400 original_name,
401 et,
402 summary,
403 &query_vec,
404 )
405 .await
406 .map(Some);
407 }
408 };
409
410 if let Some(best) = candidates.first() {
411 let score = best.score;
412 if score >= self.similarity_threshold {
413 let entity_id = best
414 .payload
415 .get("entity_id")
416 .and_then(serde_json::Value::as_i64)
417 .ok_or_else(|| {
418 MemoryError::GraphStore("missing entity_id in payload".into())
419 })?;
420 self.merge_entity(
421 emb_store,
422 provider,
423 entity_id,
424 surface_name,
425 normalized,
426 et,
427 summary,
428 )
429 .await?;
430 return Ok(Some((
431 entity_id,
432 ResolutionOutcome::EmbeddingMatch { score },
433 )));
434 } else if score >= self.ambiguous_threshold
435 && let Some(result) = self
436 .handle_ambiguous_candidate(
437 emb_store,
438 provider,
439 &best.payload,
440 score,
441 surface_name,
442 normalized,
443 et,
444 summary,
445 )
446 .await?
447 {
448 return Ok(Some(result));
449 }
450 }
452
453 self.create_with_embedding(
455 emb_store,
456 surface_name,
457 normalized,
458 original_name,
459 et,
460 summary,
461 &query_vec,
462 )
463 .await
464 .map(Some)
465 }
466
467 #[allow(clippy::too_many_arguments)]
469 async fn create_with_embedding(
470 &self,
471 emb_store: &EmbeddingStore,
472 surface_name: &str,
473 normalized: &str,
474 original_name: &str,
475 et: EntityType,
476 summary: Option<&str>,
477 query_vec: &[f32],
478 ) -> Result<(i64, ResolutionOutcome), MemoryError> {
479 let entity_id = self
480 .store
481 .upsert_entity(surface_name, normalized, et, summary)
482 .await?;
483 self.register_aliases(entity_id, normalized, original_name)
484 .await?;
485 self.store_entity_embedding(
486 emb_store,
487 entity_id,
488 None,
489 normalized,
490 et,
491 summary.unwrap_or(""),
492 query_vec,
493 )
494 .await;
495 Ok((entity_id, ResolutionOutcome::Created))
496 }
497
498 async fn register_aliases(
500 &self,
501 entity_id: i64,
502 normalized: &str,
503 original_name: &str,
504 ) -> Result<(), MemoryError> {
505 self.store.add_alias(entity_id, normalized).await?;
506
507 let original_trimmed = original_name.trim().to_lowercase();
510 let original_clean_str = strip_control_chars(&original_trimmed);
511 let original_clean = truncate_to_bytes(&original_clean_str, MAX_ENTITY_NAME_BYTES);
512 if original_clean != normalized {
513 self.store.add_alias(entity_id, original_clean).await?;
514 }
515
516 Ok(())
517 }
518
519 #[allow(clippy::too_many_arguments)]
521 async fn merge_entity(
522 &self,
523 emb_store: &EmbeddingStore,
524 provider: &AnyProvider,
525 entity_id: i64,
526 new_surface_name: &str,
527 new_canonical_name: &str,
528 entity_type: EntityType,
529 new_summary: Option<&str>,
530 ) -> Result<(), MemoryError> {
531 let existing = self.store.find_entity_by_id(entity_id).await?;
534 let existing_summary = existing
535 .as_ref()
536 .and_then(|e| e.summary.as_deref())
537 .unwrap_or("");
538
539 let merged_summary = if let Some(new) = new_summary {
540 if !new.is_empty() && !existing_summary.is_empty() {
541 let combined = format!("{existing_summary}; {new}");
542 truncate_to_bytes(&combined, MAX_FACT_BYTES).to_owned()
544 } else if !new.is_empty() {
545 new.to_owned()
546 } else {
547 existing_summary.to_owned()
548 }
549 } else {
550 existing_summary.to_owned()
551 };
552
553 let summary_opt = if merged_summary.is_empty() {
554 None
555 } else {
556 Some(merged_summary.as_str())
557 };
558
559 let existing_canonical = existing.as_ref().map_or_else(
561 || new_canonical_name.to_owned(),
562 |e| e.canonical_name.clone(),
563 );
564 let existing_name_owned = existing
565 .as_ref()
566 .map_or_else(|| new_surface_name.to_owned(), |e| e.name.clone());
567 self.store
568 .upsert_entity(
569 &existing_name_owned,
570 &existing_canonical,
571 entity_type,
572 summary_opt,
573 )
574 .await?;
575
576 let existing_point_id = existing
578 .as_ref()
579 .and_then(|e| e.qdrant_point_id.as_deref())
580 .map(ToOwned::to_owned);
581
582 let embed_text = format!("{existing_name_owned}: {merged_summary}");
584 let embed_result = tokio::time::timeout(
585 std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
586 provider.embed(&embed_text),
587 )
588 .await;
589
590 match embed_result {
591 Ok(Ok(vec)) => {
592 self.store_entity_embedding(
593 emb_store,
594 entity_id,
595 existing_point_id.as_deref(),
596 &existing_name_owned,
597 entity_type,
598 &merged_summary,
599 &vec,
600 )
601 .await;
602 }
603 Ok(Err(err)) => {
604 tracing::warn!(
605 entity_id,
606 error = %err,
607 "merge re-embed failed; Qdrant entry may be stale"
608 );
609 }
610 Err(_) => {
611 tracing::warn!(
612 entity_id,
613 "merge re-embed timed out; Qdrant entry may be stale"
614 );
615 }
616 }
617
618 Ok(())
619 }
620
621 #[allow(clippy::too_many_arguments)]
629 async fn store_entity_embedding(
630 &self,
631 emb_store: &EmbeddingStore,
632 entity_id: i64,
633 existing_point_id: Option<&str>,
634 name: &str,
635 entity_type: EntityType,
636 summary: &str,
637 vector: &[f32],
638 ) {
639 let vector_size = u64::try_from(vector.len()).unwrap_or(384);
643 if let Err(err) = emb_store
644 .ensure_named_collection(ENTITY_COLLECTION, vector_size)
645 .await
646 {
647 tracing::error!(
648 error = %err,
649 "failed to ensure entity embedding collection; skipping Qdrant upsert"
650 );
651 return;
652 }
653
654 let payload = serde_json::json!({
655 "entity_id": entity_id,
656 "name": name,
657 "entity_type": entity_type.as_str(),
658 "summary": summary,
659 });
660
661 if let Some(point_id) = existing_point_id {
662 if let Err(err) = emb_store
664 .upsert_to_collection(ENTITY_COLLECTION, point_id, payload, vector.to_vec())
665 .await
666 {
667 tracing::warn!(
668 entity_id,
669 error = %err,
670 "Qdrant upsert (existing point) failed; Qdrant entry may be stale"
671 );
672 }
673 } else {
674 match emb_store
675 .store_to_collection(ENTITY_COLLECTION, payload, vector.to_vec())
676 .await
677 {
678 Ok(point_id) => {
679 if let Err(err) = self
680 .store
681 .set_entity_qdrant_point_id(entity_id, &point_id)
682 .await
683 {
684 tracing::warn!(
685 entity_id,
686 error = %err,
687 "failed to store qdrant_point_id in SQLite"
688 );
689 }
690 }
691 Err(err) => {
692 tracing::warn!(
693 entity_id,
694 error = %err,
695 "Qdrant upsert failed; entity created in SQLite, qdrant_point_id remains NULL"
696 );
697 }
698 }
699 }
700 }
701
702 #[allow(clippy::too_many_arguments)]
706 async fn llm_disambiguate(
707 &self,
708 provider: &AnyProvider,
709 new_name: &str,
710 new_type: &str,
711 new_summary: &str,
712 existing_name: &str,
713 existing_type: &str,
714 existing_summary: &str,
715 score: f32,
716 ) -> Option<bool> {
717 let prompt = format!(
718 "New entity:\n\
719 - Name: <external-data>{new_name}</external-data>\n\
720 - Type: <external-data>{new_type}</external-data>\n\
721 - Summary: <external-data>{new_summary}</external-data>\n\
722 \n\
723 Existing entity:\n\
724 - Name: <external-data>{existing_name}</external-data>\n\
725 - Type: <external-data>{existing_type}</external-data>\n\
726 - Summary: <external-data>{existing_summary}</external-data>\n\
727 \n\
728 Cosine similarity: {score:.2}\n\
729 \n\
730 Are these the same entity? Respond with JSON: {{\"same_entity\": true}} or {{\"same_entity\": false}}"
731 );
732
733 let messages = [
734 Message::from_legacy(
735 Role::System,
736 "You are an entity disambiguation assistant. Given a new entity mention and \
737 an existing entity from the knowledge graph, determine if they refer to the same \
738 real-world entity. Respond only with JSON.",
739 ),
740 Message::from_legacy(Role::User, prompt),
741 ];
742
743 let response = match provider.chat(&messages).await {
744 Ok(r) => r,
745 Err(err) => {
746 tracing::warn!(error = %err, "LLM disambiguation chat failed");
747 return None;
748 }
749 };
750
751 let json_str = extract_json(&response);
753 match serde_json::from_str::<DisambiguationResponse>(json_str) {
754 Ok(parsed) => Some(parsed.same_entity),
755 Err(err) => {
756 tracing::warn!(error = %err, response = %response, "failed to parse LLM disambiguation response");
757 None
758 }
759 }
760 }
761
762 pub async fn resolve_batch(
775 &self,
776 entities: &[ExtractedEntity],
777 ) -> Result<Vec<(i64, ResolutionOutcome)>, MemoryError> {
778 if entities.is_empty() {
779 return Ok(Vec::new());
780 }
781
782 let mut results: Vec<Option<(i64, ResolutionOutcome)>> = vec![None; entities.len()];
784
785 let mut stream = stream::iter(entities.iter().enumerate().map(|(i, e)| {
786 let name = e.name.clone();
787 let entity_type = e.entity_type.clone();
788 let summary = e.summary.clone();
789 async move {
790 let result = self.resolve(&name, &entity_type, summary.as_deref()).await;
791 (i, result)
792 }
793 }))
794 .buffer_unordered(4);
795
796 while let Some((i, result)) = stream.next().await {
797 match result {
798 Ok(outcome) => results[i] = Some(outcome),
799 Err(err) => return Err(err),
800 }
801 }
802
803 Ok(results
804 .into_iter()
805 .enumerate()
806 .map(|(i, r)| {
807 r.unwrap_or_else(|| {
808 tracing::warn!(
809 index = i,
810 "resolve_batch: missing result at index — bug in stream collection"
811 );
812 panic!("resolve_batch: missing result at index {i}")
813 })
814 })
815 .collect())
816 }
817
818 pub async fn resolve_edge(
832 &self,
833 source_id: i64,
834 target_id: i64,
835 relation: &str,
836 fact: &str,
837 confidence: f32,
838 episode_id: Option<MessageId>,
839 ) -> Result<Option<i64>, MemoryError> {
840 let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
841 let normalized_relation = truncate_to_bytes(&relation_clean, MAX_RELATION_BYTES).to_owned();
842
843 let fact_clean = strip_control_chars(fact.trim());
844 let normalized_fact = truncate_to_bytes(&fact_clean, MAX_FACT_BYTES).to_owned();
845
846 let existing_edges = self.store.edges_exact(source_id, target_id).await?;
848
849 let matching = existing_edges
850 .iter()
851 .find(|e| e.relation == normalized_relation);
852
853 if let Some(old) = matching {
854 if old.fact == normalized_fact {
855 return Ok(None);
857 }
858 self.store.invalidate_edge(old.id).await?;
860 }
861
862 let new_id = self
863 .store
864 .insert_edge(
865 source_id,
866 target_id,
867 &normalized_relation,
868 &normalized_fact,
869 confidence,
870 episode_id,
871 )
872 .await?;
873 Ok(Some(new_id))
874 }
875}
876
877fn extract_json(s: &str) -> &str {
879 let trimmed = s.trim();
880 if let Some(inner) = trimmed.strip_prefix("```json")
882 && let Some(end) = inner.rfind("```")
883 {
884 return inner[..end].trim();
885 }
886 if let Some(inner) = trimmed.strip_prefix("```")
887 && let Some(end) = inner.rfind("```")
888 {
889 return inner[..end].trim();
890 }
891 if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
893 && start <= end
894 {
895 return &trimmed[start..=end];
896 }
897 trimmed
898}
899
900#[cfg(test)]
901mod tests {
902 use std::sync::Arc;
903
904 use super::*;
905 use crate::in_memory_store::InMemoryVectorStore;
906 use crate::sqlite::SqliteStore;
907
908 async fn setup() -> GraphStore {
909 let store = SqliteStore::new(":memory:").await.unwrap();
910 GraphStore::new(store.pool().clone())
911 }
912
913 async fn setup_with_embedding() -> (GraphStore, Arc<EmbeddingStore>) {
914 let sqlite = SqliteStore::new(":memory:").await.unwrap();
915 let pool = sqlite.pool().clone();
916 let mem_store = Box::new(InMemoryVectorStore::new());
917 let emb = Arc::new(EmbeddingStore::with_store(mem_store, pool));
918 let gs = GraphStore::new(sqlite.pool().clone());
919 (gs, emb)
920 }
921
922 fn make_mock_provider_with_embedding(embedding: Vec<f32>) -> zeph_llm::mock::MockProvider {
923 let mut p = zeph_llm::mock::MockProvider::default();
924 p.embedding = embedding;
925 p.supports_embeddings = true;
926 p
927 }
928
929 #[tokio::test]
932 async fn resolve_creates_new_entity() {
933 let gs = setup().await;
934 let resolver = EntityResolver::new(&gs);
935 let (id, outcome) = resolver
936 .resolve("alice", "person", Some("a person"))
937 .await
938 .unwrap();
939 assert!(id > 0);
940 assert_eq!(outcome, ResolutionOutcome::Created);
941 }
942
943 #[tokio::test]
944 async fn resolve_updates_existing_entity() {
945 let gs = setup().await;
946 let resolver = EntityResolver::new(&gs);
947 let (id1, _) = resolver.resolve("alice", "person", None).await.unwrap();
948 let (id2, outcome) = resolver
949 .resolve("alice", "person", Some("updated summary"))
950 .await
951 .unwrap();
952 assert_eq!(id1, id2);
953 assert_eq!(outcome, ResolutionOutcome::ExactMatch);
954
955 let entity = gs
956 .find_entity("alice", EntityType::Person)
957 .await
958 .unwrap()
959 .unwrap();
960 assert_eq!(entity.summary.as_deref(), Some("updated summary"));
961 }
962
963 #[tokio::test]
964 async fn resolve_unknown_type_falls_back_to_concept() {
965 let gs = setup().await;
966 let resolver = EntityResolver::new(&gs);
967 let (id, _) = resolver
968 .resolve("my_thing", "unknown_type", None)
969 .await
970 .unwrap();
971 assert!(id > 0);
972
973 let entity = gs
975 .find_entity("my_thing", EntityType::Concept)
976 .await
977 .unwrap();
978 assert!(entity.is_some());
979 }
980
981 #[tokio::test]
982 async fn resolve_empty_name_returns_error() {
983 let gs = setup().await;
984 let resolver = EntityResolver::new(&gs);
985
986 let result_empty = resolver.resolve("", "concept", None).await;
987 assert!(result_empty.is_err());
988 assert!(matches!(
989 result_empty.unwrap_err(),
990 MemoryError::GraphStore(_)
991 ));
992
993 let result_whitespace = resolver.resolve(" ", "concept", None).await;
994 assert!(result_whitespace.is_err());
995 }
996
997 #[tokio::test]
999 async fn resolve_short_name_below_min_returns_error() {
1000 let gs = setup().await;
1001 let resolver = EntityResolver::new(&gs);
1002
1003 let err_go = resolver.resolve("go", "technology", None).await;
1005 assert!(err_go.is_err(), "\"go\" (2 bytes) must be rejected");
1006 assert!(
1007 matches!(err_go.unwrap_err(), MemoryError::GraphStore(_)),
1008 "expected GraphStore error for short name"
1009 );
1010
1011 let err_cd = resolver.resolve("cd", "concept", None).await;
1012 assert!(err_cd.is_err(), "\"cd\" (2 bytes) must be rejected");
1013 }
1014
1015 #[tokio::test]
1016 async fn resolve_name_at_min_length_passes() {
1017 let gs = setup().await;
1018 let resolver = EntityResolver::new(&gs);
1019
1020 let result = resolver.resolve("git", "technology", None).await;
1022 assert!(
1023 result.is_ok(),
1024 "\"git\" (3 bytes) must pass min-length check"
1025 );
1026 }
1027
1028 #[tokio::test]
1029 async fn resolve_case_insensitive() {
1030 let gs = setup().await;
1031 let resolver = EntityResolver::new(&gs);
1032
1033 let (id1, _) = resolver.resolve("Rust", "language", None).await.unwrap();
1034 let (id2, outcome) = resolver.resolve("rust", "language", None).await.unwrap();
1035 assert_eq!(
1036 id1, id2,
1037 "'Rust' and 'rust' should resolve to the same entity"
1038 );
1039 assert_eq!(outcome, ResolutionOutcome::ExactMatch);
1040 }
1041
1042 #[tokio::test]
1043 async fn resolve_edge_inserts_new() {
1044 let gs = setup().await;
1045 let resolver = EntityResolver::new(&gs);
1046
1047 let src = gs
1048 .upsert_entity("src", "src", EntityType::Concept, None)
1049 .await
1050 .unwrap();
1051 let tgt = gs
1052 .upsert_entity("tgt", "tgt", EntityType::Concept, None)
1053 .await
1054 .unwrap();
1055
1056 let result = resolver
1057 .resolve_edge(src, tgt, "uses", "src uses tgt", 0.9, None)
1058 .await
1059 .unwrap();
1060 assert!(result.is_some());
1061 assert!(result.unwrap() > 0);
1062 }
1063
1064 #[tokio::test]
1065 async fn resolve_edge_deduplicates_identical() {
1066 let gs = setup().await;
1067 let resolver = EntityResolver::new(&gs);
1068
1069 let src = gs
1070 .upsert_entity("a", "a", EntityType::Concept, None)
1071 .await
1072 .unwrap();
1073 let tgt = gs
1074 .upsert_entity("b", "b", EntityType::Concept, None)
1075 .await
1076 .unwrap();
1077
1078 let first = resolver
1079 .resolve_edge(src, tgt, "uses", "a uses b", 0.9, None)
1080 .await
1081 .unwrap();
1082 assert!(first.is_some());
1083
1084 let second = resolver
1085 .resolve_edge(src, tgt, "uses", "a uses b", 0.9, None)
1086 .await
1087 .unwrap();
1088 assert!(second.is_none(), "identical edge should be deduplicated");
1089 }
1090
1091 #[tokio::test]
1092 async fn resolve_edge_supersedes_contradictory() {
1093 let gs = setup().await;
1094 let resolver = EntityResolver::new(&gs);
1095
1096 let src = gs
1097 .upsert_entity("x", "x", EntityType::Concept, None)
1098 .await
1099 .unwrap();
1100 let tgt = gs
1101 .upsert_entity("y", "y", EntityType::Concept, None)
1102 .await
1103 .unwrap();
1104
1105 let first_id = resolver
1106 .resolve_edge(src, tgt, "prefers", "x prefers y (old)", 0.8, None)
1107 .await
1108 .unwrap()
1109 .unwrap();
1110
1111 let second_id = resolver
1112 .resolve_edge(src, tgt, "prefers", "x prefers y (new)", 0.9, None)
1113 .await
1114 .unwrap()
1115 .unwrap();
1116
1117 assert_ne!(first_id, second_id, "superseded edge should have a new ID");
1118
1119 let active_count = gs.active_edge_count().await.unwrap();
1121 assert_eq!(active_count, 1, "only new edge should be active");
1122 }
1123
1124 #[tokio::test]
1125 async fn resolve_edge_direction_sensitive() {
1126 let gs = setup().await;
1128 let resolver = EntityResolver::new(&gs);
1129
1130 let a = gs
1131 .upsert_entity("node_a", "node_a", EntityType::Concept, None)
1132 .await
1133 .unwrap();
1134 let b = gs
1135 .upsert_entity("node_b", "node_b", EntityType::Concept, None)
1136 .await
1137 .unwrap();
1138
1139 let id1 = resolver
1141 .resolve_edge(a, b, "uses", "A uses B", 0.9, None)
1142 .await
1143 .unwrap();
1144 assert!(id1.is_some());
1145
1146 let id2 = resolver
1148 .resolve_edge(b, a, "uses", "B uses A (different direction)", 0.9, None)
1149 .await
1150 .unwrap();
1151 assert!(id2.is_some());
1152
1153 let active_count = gs.active_edge_count().await.unwrap();
1155 assert_eq!(active_count, 2, "both directional edges should be active");
1156 }
1157
1158 #[tokio::test]
1159 async fn resolve_edge_normalizes_relation_case() {
1160 let gs = setup().await;
1161 let resolver = EntityResolver::new(&gs);
1162
1163 let src = gs
1164 .upsert_entity("p", "p", EntityType::Concept, None)
1165 .await
1166 .unwrap();
1167 let tgt = gs
1168 .upsert_entity("q", "q", EntityType::Concept, None)
1169 .await
1170 .unwrap();
1171
1172 let id1 = resolver
1174 .resolve_edge(src, tgt, "Uses", "p uses q", 0.9, None)
1175 .await
1176 .unwrap();
1177 assert!(id1.is_some());
1178
1179 let id2 = resolver
1181 .resolve_edge(src, tgt, "uses", "p uses q", 0.9, None)
1182 .await
1183 .unwrap();
1184 assert!(id2.is_none(), "normalized relations should deduplicate");
1185 }
1186
1187 #[tokio::test]
1190 async fn resolve_entity_type_uppercase_parsed_correctly() {
1191 let gs = setup().await;
1192 let resolver = EntityResolver::new(&gs);
1193
1194 let (id, _) = resolver
1196 .resolve("test_entity", "Person", None)
1197 .await
1198 .unwrap();
1199 assert!(id > 0);
1200
1201 let entity = gs
1202 .find_entity("test_entity", EntityType::Person)
1203 .await
1204 .unwrap();
1205 assert!(entity.is_some(), "entity should be stored as Person type");
1206 }
1207
1208 #[tokio::test]
1209 async fn resolve_entity_type_all_caps_parsed_correctly() {
1210 let gs = setup().await;
1211 let resolver = EntityResolver::new(&gs);
1212
1213 let (id, _) = resolver.resolve("my_lang", "LANGUAGE", None).await.unwrap();
1214 assert!(id > 0);
1215
1216 let entity = gs
1217 .find_entity("my_lang", EntityType::Language)
1218 .await
1219 .unwrap();
1220 assert!(entity.is_some(), "entity should be stored as Language type");
1221 }
1222
1223 #[tokio::test]
1226 async fn resolve_truncates_long_entity_name() {
1227 let gs = setup().await;
1228 let resolver = EntityResolver::new(&gs);
1229
1230 let long_name = "a".repeat(1024);
1231 let (id, _) = resolver.resolve(&long_name, "concept", None).await.unwrap();
1232 assert!(id > 0);
1233
1234 let entity = gs
1236 .find_entity(&"a".repeat(512), EntityType::Concept)
1237 .await
1238 .unwrap();
1239 assert!(entity.is_some(), "truncated name should be stored");
1240 }
1241
1242 #[tokio::test]
1245 async fn resolve_strips_control_chars_from_name() {
1246 let gs = setup().await;
1247 let resolver = EntityResolver::new(&gs);
1248
1249 let name_with_ctrl = "rust\x00lang";
1251 let (id, _) = resolver
1252 .resolve(name_with_ctrl, "language", None)
1253 .await
1254 .unwrap();
1255 assert!(id > 0);
1256
1257 let entity = gs
1259 .find_entity("rustlang", EntityType::Language)
1260 .await
1261 .unwrap();
1262 assert!(
1263 entity.is_some(),
1264 "control chars should be stripped from stored name"
1265 );
1266 }
1267
1268 #[tokio::test]
1269 async fn resolve_strips_bidi_overrides_from_name() {
1270 let gs = setup().await;
1271 let resolver = EntityResolver::new(&gs);
1272
1273 let name_with_bidi = "rust\u{202E}lang";
1275 let (id, _) = resolver
1276 .resolve(name_with_bidi, "language", None)
1277 .await
1278 .unwrap();
1279 assert!(id > 0);
1280
1281 let entity = gs
1282 .find_entity("rustlang", EntityType::Language)
1283 .await
1284 .unwrap();
1285 assert!(entity.is_some(), "BiDi override chars should be stripped");
1286 }
1287
1288 #[test]
1291 fn strip_control_chars_removes_ascii_controls() {
1292 assert_eq!(strip_control_chars("hello\x00world"), "helloworld");
1293 assert_eq!(strip_control_chars("tab\there"), "tabhere");
1294 assert_eq!(strip_control_chars("new\nline"), "newline");
1295 }
1296
1297 #[test]
1298 fn strip_control_chars_removes_bidi() {
1299 let bidi = "\u{202E}spoof";
1300 assert_eq!(strip_control_chars(bidi), "spoof");
1301 }
1302
1303 #[test]
1304 fn strip_control_chars_preserves_normal_unicode() {
1305 assert_eq!(strip_control_chars("привет мир"), "привет мир");
1306 assert_eq!(strip_control_chars("日本語"), "日本語");
1307 }
1308
1309 #[test]
1310 fn truncate_to_bytes_exact_boundary() {
1311 let s = "hello";
1312 assert_eq!(truncate_to_bytes(s, 5), "hello");
1313 assert_eq!(truncate_to_bytes(s, 3), "hel");
1314 }
1315
1316 #[test]
1317 fn truncate_to_bytes_respects_utf8_boundary() {
1318 let s = "élan";
1320 let truncated = truncate_to_bytes(s, 1);
1321 assert!(s.is_char_boundary(truncated.len()));
1322 }
1323
1324 #[tokio::test]
1327 async fn resolve_with_embedding_store_score_above_threshold_merges() {
1328 let (gs, emb) = setup_with_embedding().await;
1329 let existing_id = gs
1333 .upsert_entity(
1334 "python programming lang",
1335 "python programming lang",
1336 EntityType::Language,
1337 Some("a programming language"),
1338 )
1339 .await
1340 .unwrap();
1341
1342 let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1343 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1344 .await
1345 .unwrap();
1346 let payload = serde_json::json!({
1347 "entity_id": existing_id,
1348 "name": "python programming lang",
1349 "entity_type": "language",
1350 "summary": "a programming language",
1351 });
1352 let point_id = emb
1353 .store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1354 .await
1355 .unwrap();
1356 gs.set_entity_qdrant_point_id(existing_id, &point_id)
1357 .await
1358 .unwrap();
1359
1360 let provider = make_mock_provider_with_embedding(mock_vec);
1362 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1363
1364 let resolver = EntityResolver::new(&gs)
1365 .with_embedding_store(&emb)
1366 .with_provider(&any_provider)
1367 .with_thresholds(0.85, 0.70);
1368
1369 let (id, outcome) = resolver
1371 .resolve(
1372 "python scripting lang",
1373 "language",
1374 Some("scripting language"),
1375 )
1376 .await
1377 .unwrap();
1378
1379 assert_eq!(id, existing_id, "should return existing entity ID on merge");
1380 assert!(
1381 matches!(outcome, ResolutionOutcome::EmbeddingMatch { score } if score > 0.85),
1382 "outcome should be EmbeddingMatch with score > 0.85, got {outcome:?}"
1383 );
1384 }
1385
1386 #[tokio::test]
1387 async fn resolve_with_embedding_store_score_below_ambiguous_creates_new() {
1388 let (gs, emb) = setup_with_embedding().await;
1389 let existing_id = gs
1391 .upsert_entity("java", "java", EntityType::Language, Some("java language"))
1392 .await
1393 .unwrap();
1394
1395 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1397 .await
1398 .unwrap();
1399 let payload = serde_json::json!({
1400 "entity_id": existing_id,
1401 "name": "java",
1402 "entity_type": "language",
1403 "summary": "java language",
1404 });
1405 emb.store_to_collection(ENTITY_COLLECTION, payload, vec![1.0, 0.0, 0.0, 0.0])
1406 .await
1407 .unwrap();
1408
1409 let provider = make_mock_provider_with_embedding(vec![0.0, 1.0, 0.0, 0.0]);
1411 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1412
1413 let resolver = EntityResolver::new(&gs)
1414 .with_embedding_store(&emb)
1415 .with_provider(&any_provider)
1416 .with_thresholds(0.85, 0.70);
1417
1418 let (id, outcome) = resolver
1419 .resolve("kotlin", "language", Some("kotlin language"))
1420 .await
1421 .unwrap();
1422
1423 assert_ne!(id, existing_id, "orthogonal entity should create new");
1424 assert_eq!(outcome, ResolutionOutcome::Created);
1425 }
1426
1427 #[tokio::test]
1428 async fn resolve_with_embedding_failure_falls_back_to_create() {
1429 let sqlite2 = SqliteStore::new(":memory:").await.unwrap();
1432 let pool2 = sqlite2.pool().clone();
1433 let mem2 = Box::new(InMemoryVectorStore::new());
1434 let emb2 = Arc::new(EmbeddingStore::with_store(mem2, pool2));
1435 let gs2 = GraphStore::new(sqlite2.pool().clone());
1436
1437 let mut mock = zeph_llm::mock::MockProvider::default();
1438 mock.supports_embeddings = false;
1439 let any_provider = zeph_llm::any::AnyProvider::Mock(mock);
1440
1441 let resolver = EntityResolver::new(&gs2)
1442 .with_embedding_store(&emb2)
1443 .with_provider(&any_provider);
1444
1445 let (id, outcome) = resolver
1446 .resolve("testentity", "concept", Some("summary"))
1447 .await
1448 .unwrap();
1449 assert!(id > 0);
1450 assert_eq!(outcome, ResolutionOutcome::Created);
1451 }
1452
1453 #[tokio::test]
1454 async fn resolve_fallback_increments_counter() {
1455 let (gs, emb) = setup_with_embedding().await;
1456
1457 let mut mock = zeph_llm::mock::MockProvider::default();
1459 mock.supports_embeddings = false;
1460 let any_provider = zeph_llm::any::AnyProvider::Mock(mock);
1461
1462 let resolver = EntityResolver::new(&gs)
1463 .with_embedding_store(&emb)
1464 .with_provider(&any_provider);
1465
1466 let fallback_count = resolver.fallback_count();
1467
1468 resolver.resolve("entity_a", "concept", None).await.unwrap();
1470
1471 assert_eq!(
1472 fallback_count.load(std::sync::atomic::Ordering::Relaxed),
1473 1,
1474 "fallback counter should be 1 after embed failure"
1475 );
1476 }
1477
1478 #[tokio::test]
1479 async fn resolve_batch_processes_multiple_entities() {
1480 let gs = setup().await;
1481 let resolver = EntityResolver::new(&gs);
1482
1483 let entities = vec![
1484 ExtractedEntity {
1485 name: "rust".into(),
1486 entity_type: "language".into(),
1487 summary: Some("systems language".into()),
1488 },
1489 ExtractedEntity {
1490 name: "python".into(),
1491 entity_type: "language".into(),
1492 summary: None,
1493 },
1494 ExtractedEntity {
1495 name: "cargo".into(),
1496 entity_type: "tool".into(),
1497 summary: Some("rust build tool".into()),
1498 },
1499 ];
1500
1501 let results = resolver.resolve_batch(&entities).await.unwrap();
1502 assert_eq!(results.len(), 3);
1503 for (id, outcome) in &results {
1504 assert!(*id > 0);
1505 assert_eq!(*outcome, ResolutionOutcome::Created);
1506 }
1507 }
1508
1509 #[tokio::test]
1510 async fn resolve_batch_empty_returns_empty() {
1511 let gs = setup().await;
1512 let resolver = EntityResolver::new(&gs);
1513 let results = resolver.resolve_batch(&[]).await.unwrap();
1514 assert!(results.is_empty());
1515 }
1516
1517 #[tokio::test]
1518 async fn merge_combines_summaries() {
1519 let (gs, emb) = setup_with_embedding().await;
1520 let existing_id = gs
1524 .upsert_entity(
1525 "mergetest v1",
1526 "mergetest v1",
1527 EntityType::Concept,
1528 Some("first summary"),
1529 )
1530 .await
1531 .unwrap();
1532
1533 let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1534 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1535 .await
1536 .unwrap();
1537 let payload = serde_json::json!({
1538 "entity_id": existing_id,
1539 "name": "mergetest v1",
1540 "entity_type": "concept",
1541 "summary": "first summary",
1542 });
1543 let point_id = emb
1544 .store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1545 .await
1546 .unwrap();
1547 gs.set_entity_qdrant_point_id(existing_id, &point_id)
1548 .await
1549 .unwrap();
1550
1551 let provider = make_mock_provider_with_embedding(mock_vec);
1552 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1553
1554 let resolver = EntityResolver::new(&gs)
1555 .with_embedding_store(&emb)
1556 .with_provider(&any_provider)
1557 .with_thresholds(0.85, 0.70);
1558
1559 let (id, outcome) = resolver
1561 .resolve("mergetest v2", "concept", Some("second summary"))
1562 .await
1563 .unwrap();
1564
1565 assert_eq!(id, existing_id);
1566 assert!(matches!(outcome, ResolutionOutcome::EmbeddingMatch { .. }));
1567
1568 let entity = gs
1570 .find_entity("mergetest v1", EntityType::Concept)
1571 .await
1572 .unwrap()
1573 .unwrap();
1574 let summary = entity.summary.unwrap_or_default();
1575 assert!(
1576 summary.contains("first summary") && summary.contains("second summary"),
1577 "merged summary should contain both: got {summary:?}"
1578 );
1579 }
1580
1581 #[tokio::test]
1582 async fn merge_preserves_older_entity_id() {
1583 let (gs, emb) = setup_with_embedding().await;
1584 let existing_id = gs
1586 .upsert_entity(
1587 "legacy entity",
1588 "legacy entity",
1589 EntityType::Concept,
1590 Some("old info"),
1591 )
1592 .await
1593 .unwrap();
1594
1595 let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1596 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1597 .await
1598 .unwrap();
1599 let payload = serde_json::json!({
1600 "entity_id": existing_id,
1601 "name": "legacy entity",
1602 "entity_type": "concept",
1603 "summary": "old info",
1604 });
1605 emb.store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1606 .await
1607 .unwrap();
1608
1609 let provider = make_mock_provider_with_embedding(mock_vec);
1610 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1611
1612 let resolver = EntityResolver::new(&gs)
1613 .with_embedding_store(&emb)
1614 .with_provider(&any_provider)
1615 .with_thresholds(0.85, 0.70);
1616
1617 let (returned_id, _) = resolver
1618 .resolve("legacy entity variant", "concept", Some("new info"))
1619 .await
1620 .unwrap();
1621
1622 assert_eq!(
1623 returned_id, existing_id,
1624 "older entity ID should be preserved on merge"
1625 );
1626 }
1627
1628 #[tokio::test]
1629 async fn entity_type_filter_prevents_cross_type_merge() {
1630 let (gs, emb) = setup_with_embedding().await;
1631
1632 let person_id = gs
1634 .upsert_entity(
1635 "python",
1636 "python",
1637 EntityType::Person,
1638 Some("a person named python"),
1639 )
1640 .await
1641 .unwrap();
1642
1643 let mock_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1644 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1645 .await
1646 .unwrap();
1647 let payload = serde_json::json!({
1648 "entity_id": person_id,
1649 "name": "python",
1650 "entity_type": "person",
1651 "summary": "a person named python",
1652 });
1653 emb.store_to_collection(ENTITY_COLLECTION, payload, mock_vec.clone())
1654 .await
1655 .unwrap();
1656
1657 let provider = make_mock_provider_with_embedding(mock_vec);
1658 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1659
1660 let resolver = EntityResolver::new(&gs)
1661 .with_embedding_store(&emb)
1662 .with_provider(&any_provider)
1663 .with_thresholds(0.85, 0.70);
1664
1665 let (lang_id, outcome) = resolver
1667 .resolve("python", "language", Some("python language"))
1668 .await
1669 .unwrap();
1670
1671 assert_ne!(
1674 lang_id, person_id,
1675 "language entity should not merge with person entity"
1676 );
1677 assert_eq!(outcome, ResolutionOutcome::Created);
1680 }
1681
1682 #[tokio::test]
1683 async fn custom_thresholds_respected() {
1684 let (gs, emb) = setup_with_embedding().await;
1685 let existing_id = gs
1689 .upsert_entity(
1690 "threshold_test",
1691 "threshold_test",
1692 EntityType::Concept,
1693 Some("base"),
1694 )
1695 .await
1696 .unwrap();
1697
1698 let existing_vec = vec![1.0_f32, 0.0, 0.0, 0.0];
1699 emb.ensure_named_collection(ENTITY_COLLECTION, 4)
1700 .await
1701 .unwrap();
1702 let payload = serde_json::json!({
1703 "entity_id": existing_id,
1704 "name": "threshold_test",
1705 "entity_type": "concept",
1706 "summary": "base",
1707 });
1708 emb.store_to_collection(ENTITY_COLLECTION, payload, existing_vec)
1709 .await
1710 .unwrap();
1711
1712 let provider = make_mock_provider_with_embedding(vec![0.0, 1.0, 0.0, 0.0]);
1714 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1715
1716 let resolver = EntityResolver::new(&gs)
1718 .with_embedding_store(&emb)
1719 .with_provider(&any_provider)
1720 .with_thresholds(0.50, 0.30);
1721
1722 let (id, outcome) = resolver
1723 .resolve("new_concept", "concept", Some("different"))
1724 .await
1725 .unwrap();
1726
1727 assert_ne!(id, existing_id);
1728 assert_eq!(outcome, ResolutionOutcome::Created);
1729 }
1730
1731 #[tokio::test]
1732 async fn resolve_outcome_exact_match_no_embedding_store() {
1733 let gs = setup().await;
1734 let resolver = EntityResolver::new(&gs);
1735
1736 resolver.resolve("existing", "concept", None).await.unwrap();
1737 let (_, outcome) = resolver.resolve("existing", "concept", None).await.unwrap();
1738 assert_eq!(outcome, ResolutionOutcome::ExactMatch);
1739 }
1740
1741 #[tokio::test]
1742 async fn extract_json_strips_markdown_fences() {
1743 let with_fence = "```json\n{\"same_entity\": true}\n```";
1744 let extracted = extract_json(with_fence);
1745 let parsed: DisambiguationResponse = serde_json::from_str(extracted).unwrap();
1746 assert!(parsed.same_entity);
1747
1748 let without_fence = "{\"same_entity\": false}";
1749 let extracted2 = extract_json(without_fence);
1750 let parsed2: DisambiguationResponse = serde_json::from_str(extracted2).unwrap();
1751 assert!(!parsed2.same_entity);
1752 }
1753
1754 fn make_mock_with_embedding_and_chat(
1756 embedding: Vec<f32>,
1757 chat_responses: Vec<String>,
1758 ) -> zeph_llm::mock::MockProvider {
1759 let mut p = zeph_llm::mock::MockProvider::with_responses(chat_responses);
1760 p.embedding = embedding;
1761 p.supports_embeddings = true;
1762 p
1763 }
1764
1765 async fn seed_entity_with_vector(
1767 gs: &GraphStore,
1768 emb: &Arc<EmbeddingStore>,
1769 name: &str,
1770 entity_type: EntityType,
1771 summary: &str,
1772 vector: Vec<f32>,
1773 ) -> i64 {
1774 let id = gs
1775 .upsert_entity(name, name, entity_type, Some(summary))
1776 .await
1777 .unwrap();
1778 emb.ensure_named_collection(ENTITY_COLLECTION, u64::try_from(vector.len()).unwrap())
1779 .await
1780 .unwrap();
1781 let payload = serde_json::json!({
1782 "entity_id": id,
1783 "name": name,
1784 "entity_type": entity_type.as_str(),
1785 "summary": summary,
1786 });
1787 let point_id = emb
1788 .store_to_collection(ENTITY_COLLECTION, payload, vector)
1789 .await
1790 .unwrap();
1791 gs.set_entity_qdrant_point_id(id, &point_id).await.unwrap();
1792 id
1793 }
1794
1795 #[tokio::test]
1798 async fn resolve_ambiguous_score_llm_says_merge() {
1799 let (gs, emb) = setup_with_embedding().await;
1802 let existing_id = seed_entity_with_vector(
1803 &gs,
1804 &emb,
1805 "goroutine",
1806 EntityType::Concept,
1807 "go concurrency primitive",
1808 vec![1.0, 0.0, 0.0, 0.0],
1809 )
1810 .await;
1811
1812 let provider = make_mock_with_embedding_and_chat(
1814 vec![1.0, 1.0, 0.0, 0.0],
1815 vec![r#"{"same_entity": true}"#.to_owned()],
1816 );
1817 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1818
1819 let resolver = EntityResolver::new(&gs)
1820 .with_embedding_store(&emb)
1821 .with_provider(&any_provider)
1822 .with_thresholds(0.85, 0.50);
1823
1824 let (id, outcome) = resolver
1825 .resolve("goroutine concurrency", "concept", Some("go concurrency"))
1826 .await
1827 .unwrap();
1828
1829 assert_eq!(
1830 id, existing_id,
1831 "should return existing entity ID on LLM merge"
1832 );
1833 assert_eq!(outcome, ResolutionOutcome::LlmDisambiguated);
1834 }
1835
1836 #[tokio::test]
1839 async fn resolve_ambiguous_score_llm_says_different() {
1840 let (gs, emb) = setup_with_embedding().await;
1841 let existing_id = seed_entity_with_vector(
1842 &gs,
1843 &emb,
1844 "channel",
1845 EntityType::Concept,
1846 "go channel",
1847 vec![1.0, 0.0, 0.0, 0.0],
1848 )
1849 .await;
1850
1851 let provider = make_mock_with_embedding_and_chat(
1853 vec![1.0, 1.0, 0.0, 0.0],
1854 vec![r#"{"same_entity": false}"#.to_owned()],
1855 );
1856 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1857
1858 let resolver = EntityResolver::new(&gs)
1859 .with_embedding_store(&emb)
1860 .with_provider(&any_provider)
1861 .with_thresholds(0.85, 0.50);
1862
1863 let (id, outcome) = resolver
1864 .resolve("network channel", "concept", Some("networking channel"))
1865 .await
1866 .unwrap();
1867
1868 assert_ne!(
1869 id, existing_id,
1870 "LLM-rejected match should create new entity"
1871 );
1872 assert_eq!(outcome, ResolutionOutcome::Created);
1873 }
1874
1875 #[tokio::test]
1878 async fn resolve_ambiguous_score_llm_failure_increments_fallback() {
1879 let (gs, emb) = setup_with_embedding().await;
1880 let existing_id = seed_entity_with_vector(
1881 &gs,
1882 &emb,
1883 "mutex",
1884 EntityType::Concept,
1885 "mutual exclusion lock",
1886 vec![1.0, 0.0, 0.0, 0.0],
1887 )
1888 .await;
1889
1890 let mut provider = make_mock_with_embedding_and_chat(vec![1.0, 1.0, 0.0, 0.0], vec![]);
1892 provider.fail_chat = true;
1893 let any_provider = zeph_llm::any::AnyProvider::Mock(provider);
1894
1895 let resolver = EntityResolver::new(&gs)
1896 .with_embedding_store(&emb)
1897 .with_provider(&any_provider)
1898 .with_thresholds(0.85, 0.50);
1899
1900 let fallback_count = resolver.fallback_count();
1901
1902 let (id, outcome) = resolver
1903 .resolve("mutex lock", "concept", Some("synchronization primitive"))
1904 .await
1905 .unwrap();
1906
1907 assert_ne!(
1909 id, existing_id,
1910 "LLM failure should create new entity (fallback)"
1911 );
1912 assert_eq!(outcome, ResolutionOutcome::Created);
1913 assert_eq!(
1914 fallback_count.load(std::sync::atomic::Ordering::Relaxed),
1915 1,
1916 "fallback counter should be incremented on LLM chat failure"
1917 );
1918 }
1919
1920 #[tokio::test]
1923 async fn resolve_creates_entity_with_canonical_name() {
1924 let gs = setup().await;
1925 let resolver = EntityResolver::new(&gs);
1926 let (id, _) = resolver.resolve("Rust", "language", None).await.unwrap();
1927 assert!(id > 0);
1928 let entity = gs
1929 .find_entity("rust", EntityType::Language)
1930 .await
1931 .unwrap()
1932 .unwrap();
1933 assert_eq!(entity.canonical_name, "rust");
1934 }
1935
1936 #[tokio::test]
1937 async fn resolve_adds_alias_on_create() {
1938 let gs = setup().await;
1939 let resolver = EntityResolver::new(&gs);
1940 let (id, _) = resolver.resolve("Rust", "language", None).await.unwrap();
1941 let aliases = gs.aliases_for_entity(id).await.unwrap();
1942 assert!(
1943 !aliases.is_empty(),
1944 "new entity should have at least one alias"
1945 );
1946 assert!(aliases.iter().any(|a| a.alias_name == "rust"));
1947 }
1948
1949 #[tokio::test]
1950 async fn resolve_reuses_entity_by_alias() {
1951 let gs = setup().await;
1952 let resolver = EntityResolver::new(&gs);
1953
1954 let (id1, _) = resolver.resolve("rust", "language", None).await.unwrap();
1956 gs.add_alias(id1, "rust-lang").await.unwrap();
1957
1958 let (id2, _) = resolver
1960 .resolve("rust-lang", "language", None)
1961 .await
1962 .unwrap();
1963 assert_eq!(
1964 id1, id2,
1965 "'rust-lang' alias should resolve to same entity as 'rust'"
1966 );
1967 }
1968
1969 #[tokio::test]
1970 async fn resolve_alias_match_respects_entity_type() {
1971 let gs = setup().await;
1972 let resolver = EntityResolver::new(&gs);
1973
1974 let (lang_id, _) = resolver.resolve("python", "language", None).await.unwrap();
1976
1977 let (tool_id, _) = resolver.resolve("python", "tool", None).await.unwrap();
1979 assert_ne!(
1980 lang_id, tool_id,
1981 "same name with different type should be separate entities"
1982 );
1983 }
1984
1985 #[tokio::test]
1986 async fn resolve_preserves_existing_aliases() {
1987 let gs = setup().await;
1988 let resolver = EntityResolver::new(&gs);
1989
1990 let (id, _) = resolver.resolve("rust", "language", None).await.unwrap();
1991 gs.add_alias(id, "rust-lang").await.unwrap();
1992
1993 resolver
1995 .resolve("rust", "language", Some("updated"))
1996 .await
1997 .unwrap();
1998 let aliases = gs.aliases_for_entity(id).await.unwrap();
1999 assert!(
2000 aliases.iter().any(|a| a.alias_name == "rust-lang"),
2001 "prior alias must be preserved"
2002 );
2003 }
2004
2005 #[tokio::test]
2006 async fn resolve_original_form_registered_as_alias() {
2007 let gs = setup().await;
2008 let resolver = EntityResolver::new(&gs);
2009
2010 let (id, _) = resolver
2013 .resolve(" Rust ", "language", None)
2014 .await
2015 .unwrap();
2016 let aliases = gs.aliases_for_entity(id).await.unwrap();
2017 assert!(aliases.iter().any(|a| a.alias_name == "rust"));
2018 }
2019
2020 #[tokio::test]
2021 async fn resolve_entity_with_many_aliases() {
2022 let gs = setup().await;
2023 let id = gs
2024 .upsert_entity("bigentity", "bigentity", EntityType::Concept, None)
2025 .await
2026 .unwrap();
2027 for i in 0..100 {
2028 gs.add_alias(id, &format!("alias-{i}")).await.unwrap();
2029 }
2030 let aliases = gs.aliases_for_entity(id).await.unwrap();
2031 assert_eq!(aliases.len(), 100);
2032
2033 let results = gs.find_entities_fuzzy("alias-50", 10).await.unwrap();
2035 assert!(results.iter().any(|e| e.id == id));
2036 }
2037}