1use std::sync::Arc;
5
6use dashmap::DashMap;
7use futures::stream::{self, StreamExt as _};
8use schemars::JsonSchema;
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use zeph_common::sanitize::strip_control_chars;
12use zeph_common::text::truncate_to_bytes_ref;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{LlmProvider as _, Message, Role};
15
16use super::store::GraphStore;
17use super::types::EntityType;
18use crate::embedding_store::EmbeddingStore;
19use crate::error::MemoryError;
20use crate::graph::extractor::ExtractedEntity;
21use crate::types::MessageId;
22use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
23
24const MIN_ENTITY_NAME_BYTES: usize = 3;
26const MAX_ENTITY_NAME_BYTES: usize = 512;
28const MAX_RELATION_BYTES: usize = 256;
30const MAX_FACT_BYTES: usize = 2048;
32
33const ENTITY_COLLECTION: &str = "zeph_graph_entities";
35
36const EMBED_TIMEOUT_SECS: u64 = 30;
38
39#[derive(Debug, Clone, PartialEq)]
41#[non_exhaustive]
42pub enum ResolutionOutcome {
43 ExactMatch,
45 EmbeddingMatch { score: f32 },
47 LlmDisambiguated,
49 Created,
51}
52
53#[derive(Debug, Deserialize, JsonSchema)]
55struct DisambiguationResponse {
56 same_entity: bool,
57}
58
59type NameLockMap = Arc<DashMap<String, Arc<Mutex<()>>>>;
68
69pub struct EntityResolver<'a> {
70 store: &'a GraphStore,
71 embedding_store: Option<&'a Arc<EmbeddingStore>>,
72 provider: Option<&'a AnyProvider>,
73 similarity_threshold: f32,
74 ambiguous_threshold: f32,
75 name_locks: NameLockMap,
76 fallback_count: Arc<std::sync::atomic::AtomicU64>,
78 collection_ensured: Arc<tokio::sync::OnceCell<()>>,
82 embed_timeout: std::time::Duration,
84}
85
86impl<'a> EntityResolver<'a> {
87 #[must_use]
89 pub fn graph_store(&self) -> &GraphStore {
90 self.store
91 }
92
93 #[must_use]
94 pub fn new(store: &'a GraphStore) -> Self {
95 Self {
96 store,
97 embedding_store: None,
98 provider: None,
99 similarity_threshold: 0.85,
100 ambiguous_threshold: 0.70,
101 name_locks: Arc::new(DashMap::new()),
102 fallback_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
103 collection_ensured: Arc::new(tokio::sync::OnceCell::new()),
104 embed_timeout: std::time::Duration::from_secs(5),
105 }
106 }
107
108 #[must_use]
112 pub fn with_embed_timeout(mut self, timeout_secs: u64) -> Self {
113 self.embed_timeout = std::time::Duration::from_secs(timeout_secs);
114 self
115 }
116
117 #[must_use]
118 pub fn with_embedding_store(mut self, store: &'a Arc<EmbeddingStore>) -> Self {
119 self.embedding_store = Some(store);
120 self
121 }
122
123 #[must_use]
124 pub fn with_provider(mut self, provider: &'a AnyProvider) -> Self {
125 self.provider = Some(provider);
126 self
127 }
128
129 #[must_use]
130 pub fn with_thresholds(mut self, similarity: f32, ambiguous: f32) -> Self {
131 self.similarity_threshold = similarity;
132 self.ambiguous_threshold = ambiguous;
133 self
134 }
135
136 #[must_use]
138 pub fn fallback_count(&self) -> Arc<std::sync::atomic::AtomicU64> {
139 Arc::clone(&self.fallback_count)
140 }
141
142 fn normalize_name(name: &str) -> String {
144 let lowered = name.trim().to_lowercase();
145 let cleaned = strip_control_chars(&lowered);
146 let normalized = truncate_to_bytes_ref(&cleaned, MAX_ENTITY_NAME_BYTES).to_owned();
147 if normalized.len() < cleaned.len() {
148 tracing::debug!(
149 "graph resolver: entity name truncated to {} bytes",
150 MAX_ENTITY_NAME_BYTES
151 );
152 }
153 normalized
154 }
155
156 fn parse_entity_type(entity_type: &str) -> EntityType {
158 entity_type
159 .trim()
160 .to_lowercase()
161 .parse::<EntityType>()
162 .unwrap_or_else(|_| {
163 tracing::debug!(
164 "graph resolver: unknown entity type {:?}, falling back to Concept",
165 entity_type
166 );
167 EntityType::Concept
168 })
169 }
170
171 async fn lock_name(&self, normalized: &str) -> tokio::sync::OwnedMutexGuard<()> {
173 let lock = self
174 .name_locks
175 .entry(normalized.to_owned())
176 .or_insert_with(|| Arc::new(Mutex::new(())))
177 .clone();
178 lock.lock_owned().await
179 }
180
181 pub async fn resolve(
200 &self,
201 name: &str,
202 entity_type: &str,
203 summary: Option<&str>,
204 ) -> Result<(i64, ResolutionOutcome), MemoryError> {
205 let normalized = Self::normalize_name(name);
206
207 if normalized.is_empty() {
208 return Err(MemoryError::GraphStore("empty entity name".into()));
209 }
210
211 if normalized.len() < MIN_ENTITY_NAME_BYTES {
212 return Err(MemoryError::GraphStore(format!(
213 "entity name too short: {normalized:?} ({} bytes, min {MIN_ENTITY_NAME_BYTES})",
214 normalized.len()
215 )));
216 }
217
218 let et = Self::parse_entity_type(entity_type);
219
220 let surface_name = name.trim().to_owned();
222
223 let _guard = self.lock_name(&normalized).await;
225
226 if let Some(entity) = self.store.find_entity_by_alias(&normalized, et).await? {
228 self.store
229 .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
230 .await?;
231 return Ok((entity.id.0, ResolutionOutcome::ExactMatch));
232 }
233
234 if let Some(entity) = self.store.find_entity(&normalized, et).await? {
236 self.store
237 .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
238 .await?;
239 return Ok((entity.id.0, ResolutionOutcome::ExactMatch));
240 }
241
242 if let Some(outcome) = self
244 .resolve_via_embedding(&normalized, name, &surface_name, et, summary)
245 .await?
246 {
247 return Ok(outcome);
248 }
249
250 let entity_id = self
252 .store
253 .upsert_entity(&surface_name, &normalized, et, summary)
254 .await?;
255
256 self.register_aliases(entity_id.0, &normalized, name)
257 .await?;
258
259 Ok((entity_id.0, ResolutionOutcome::Created))
260 }
261
262 async fn embed_entity_text(
265 &self,
266 provider: &AnyProvider,
267 normalized: &str,
268 summary: Option<&str>,
269 ) -> Option<Vec<f32>> {
270 let safe_summary = truncate_to_bytes_ref(summary.unwrap_or(""), MAX_FACT_BYTES);
271 let embed_text = format!("{normalized}: {safe_summary}");
272 let embed_result = tokio::time::timeout(
273 std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
274 provider.embed(&embed_text),
275 )
276 .await;
277 match embed_result {
278 Ok(Ok(v)) => Some(v),
279 Ok(Err(err)) => {
280 self.fallback_count
281 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
282 tracing::warn!(entity_name = %normalized, error = %err,
283 "embed() failed; falling back to exact-match-only entity creation");
284 None
285 }
286 Err(_timeout) => {
287 self.fallback_count
288 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
289 tracing::warn!(entity_name = %normalized,
290 "embed() timed out after {}s; falling back to create new entity",
291 EMBED_TIMEOUT_SECS);
292 None
293 }
294 }
295 }
296
297 #[allow(clippy::too_many_arguments)] async fn handle_ambiguous_candidate(
301 &self,
302 emb_store: &EmbeddingStore,
303 provider: &AnyProvider,
304 payload: &std::collections::HashMap<String, serde_json::Value>,
305 point_id: &str,
306 score: f32,
307 surface_name: &str,
308 normalized: &str,
309 et: EntityType,
310 summary: Option<&str>,
311 ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
312 let entity_id = payload
313 .get("entity_id")
314 .and_then(serde_json::Value::as_i64)
315 .ok_or_else(|| MemoryError::GraphStore("missing entity_id in payload".into()))?;
316 let existing_name = payload
317 .get("name")
318 .and_then(|v| v.as_str())
319 .unwrap_or("")
320 .to_owned();
321 let existing_summary = payload
322 .get("summary")
323 .and_then(|v| v.as_str())
324 .unwrap_or("")
325 .to_owned();
326 let existing_type = payload
328 .get("entity_type")
329 .and_then(|v| v.as_str())
330 .unwrap_or(et.as_str())
331 .to_owned();
332 let existing_canonical = payload.get("canonical_name").and_then(|v| v.as_str());
333 let existing_summary_str = payload.get("summary").and_then(|v| v.as_str());
334 match self
335 .llm_disambiguate(
336 provider,
337 normalized,
338 et.as_str(),
339 summary.unwrap_or(""),
340 &existing_name,
341 &existing_type,
342 &existing_summary,
343 score,
344 )
345 .await
346 {
347 Some(true) => {
348 self.merge_entity(
349 emb_store,
350 provider,
351 entity_id,
352 surface_name,
353 normalized,
354 et,
355 summary,
356 existing_canonical,
357 existing_summary_str,
358 Some(point_id),
359 )
360 .await?;
361 Ok(Some((entity_id, ResolutionOutcome::LlmDisambiguated)))
362 }
363 Some(false) => Ok(None),
364 None => {
365 self.fallback_count
366 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
367 tracing::warn!(entity_name = %normalized,
368 "LLM disambiguation failed; falling back to create new entity");
369 Ok(None)
370 }
371 }
372 }
373
374 async fn resolve_via_embedding(
377 &self,
378 normalized: &str,
379 original_name: &str,
380 surface_name: &str,
381 et: EntityType,
382 summary: Option<&str>,
383 ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
384 let (Some(emb_store), Some(provider)) = (self.embedding_store, self.provider) else {
385 return Ok(None);
386 };
387
388 let Some(query_vec) = self.embed_entity_text(provider, normalized, summary).await else {
389 return Ok(None);
390 };
391
392 let type_filter = VectorFilter {
393 must: vec![FieldCondition {
394 field: "entity_type".into(),
395 value: FieldValue::Text(et.as_str().to_owned()),
396 }],
397 must_not: vec![],
398 };
399 let candidates = match emb_store
400 .search_collection(ENTITY_COLLECTION, &query_vec, 5, Some(type_filter))
401 .await
402 {
403 Ok(c) => c,
404 Err(err) => {
405 self.fallback_count
406 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
407 tracing::warn!(entity_name = %normalized, error = %err,
408 "Qdrant search failed; falling back to create new entity");
409 return self
410 .create_with_embedding(
411 emb_store,
412 surface_name,
413 normalized,
414 original_name,
415 et,
416 summary,
417 &query_vec,
418 )
419 .await
420 .map(Some);
421 }
422 };
423
424 if let Some(best) = candidates.first() {
425 let score = best.score;
426 if score >= self.similarity_threshold {
427 let entity_id = best
428 .payload
429 .get("entity_id")
430 .and_then(serde_json::Value::as_i64)
431 .ok_or_else(|| {
432 MemoryError::GraphStore("missing entity_id in payload".into())
433 })?;
434 let existing_canonical =
435 best.payload.get("canonical_name").and_then(|v| v.as_str());
436 let existing_summary = best.payload.get("summary").and_then(|v| v.as_str());
437 let existing_pid = Some(best.id.as_str());
438 self.merge_entity(
439 emb_store,
440 provider,
441 entity_id,
442 surface_name,
443 normalized,
444 et,
445 summary,
446 existing_canonical,
447 existing_summary,
448 existing_pid,
449 )
450 .await?;
451 return Ok(Some((
452 entity_id,
453 ResolutionOutcome::EmbeddingMatch { score },
454 )));
455 } else if score >= self.ambiguous_threshold
456 && let Some(result) = self
457 .handle_ambiguous_candidate(
458 emb_store,
459 provider,
460 &best.payload,
461 &best.id,
462 score,
463 surface_name,
464 normalized,
465 et,
466 summary,
467 )
468 .await?
469 {
470 return Ok(Some(result));
471 }
472 }
474
475 self.create_with_embedding(
477 emb_store,
478 surface_name,
479 normalized,
480 original_name,
481 et,
482 summary,
483 &query_vec,
484 )
485 .await
486 .map(Some)
487 }
488
489 #[allow(clippy::too_many_arguments)] async fn create_with_embedding(
492 &self,
493 emb_store: &EmbeddingStore,
494 surface_name: &str,
495 normalized: &str,
496 original_name: &str,
497 et: EntityType,
498 summary: Option<&str>,
499 query_vec: &[f32],
500 ) -> Result<(i64, ResolutionOutcome), MemoryError> {
501 let entity_id = self
502 .store
503 .upsert_entity(surface_name, normalized, et, summary)
504 .await?;
505 self.register_aliases(entity_id.0, normalized, original_name)
506 .await?;
507 self.store_entity_embedding(
508 emb_store,
509 entity_id.0,
510 None,
511 normalized,
512 et,
513 summary.unwrap_or(""),
514 query_vec,
515 )
516 .await;
517 Ok((entity_id.0, ResolutionOutcome::Created))
518 }
519
520 async fn register_aliases(
522 &self,
523 entity_id: i64,
524 normalized: &str,
525 original_name: &str,
526 ) -> Result<(), MemoryError> {
527 self.store.add_alias(entity_id, normalized).await?;
528
529 let original_trimmed = original_name.trim().to_lowercase();
532 let original_clean_str = strip_control_chars(&original_trimmed);
533 let original_clean = truncate_to_bytes_ref(&original_clean_str, MAX_ENTITY_NAME_BYTES);
534 if original_clean != normalized {
535 self.store.add_alias(entity_id, original_clean).await?;
536 }
537
538 Ok(())
539 }
540
541 #[allow(clippy::too_many_arguments)] async fn merge_entity(
549 &self,
550 emb_store: &EmbeddingStore,
551 provider: &AnyProvider,
552 entity_id: i64,
553 new_surface_name: &str,
554 new_canonical_name: &str,
555 entity_type: EntityType,
556 new_summary: Option<&str>,
557 existing_canonical_name: Option<&str>,
558 existing_summary_payload: Option<&str>,
559 existing_point_id: Option<&str>,
560 ) -> Result<(), MemoryError> {
561 let (existing_canonical, existing_summary, existing_point_id_owned) =
566 if existing_canonical_name.is_some() && existing_summary_payload.is_some() {
567 (
568 existing_canonical_name
569 .unwrap_or(new_canonical_name)
570 .to_owned(),
571 existing_summary_payload.unwrap_or("").to_owned(),
572 existing_point_id.map(ToOwned::to_owned),
573 )
574 } else {
575 let existing = self.store.find_entity_by_id(entity_id).await?;
580 let canonical = existing_canonical_name.map_or_else(
581 || {
582 existing.as_ref().map_or_else(
583 || new_canonical_name.to_owned(),
584 |e| e.canonical_name.clone(),
585 )
586 },
587 ToOwned::to_owned,
588 );
589 let summary = existing
590 .as_ref()
591 .and_then(|e| e.summary.as_deref())
592 .unwrap_or("")
593 .to_owned();
594 let pid = existing_point_id.map(ToOwned::to_owned).or_else(|| {
595 existing
596 .as_ref()
597 .and_then(|e| e.qdrant_point_id.as_deref())
598 .map(ToOwned::to_owned)
599 });
600 (canonical, summary, pid)
601 };
602
603 let merged_summary = if let Some(new) = new_summary {
604 if !new.is_empty() && !existing_summary.is_empty() {
605 let combined = format!("{existing_summary}; {new}");
606 truncate_to_bytes_ref(&combined, MAX_FACT_BYTES).to_owned()
608 } else if !new.is_empty() {
609 new.to_owned()
610 } else {
611 existing_summary.clone()
612 }
613 } else {
614 existing_summary.clone()
615 };
616
617 let summary_opt = if merged_summary.is_empty() {
618 None
619 } else {
620 Some(merged_summary.as_str())
621 };
622
623 self.store
626 .upsert_entity(
627 new_surface_name,
628 &existing_canonical,
629 entity_type,
630 summary_opt,
631 )
632 .await?;
633
634 let embed_text = format!("{new_surface_name}: {merged_summary}");
636 let embed_result = tokio::time::timeout(
637 std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
638 provider.embed(&embed_text),
639 )
640 .await;
641
642 match embed_result {
643 Ok(Ok(vec)) => {
644 self.store_entity_embedding(
645 emb_store,
646 entity_id,
647 existing_point_id_owned.as_deref(),
648 new_surface_name,
649 entity_type,
650 &merged_summary,
651 &vec,
652 )
653 .await;
654 }
655 Ok(Err(err)) => {
656 tracing::warn!(
657 entity_id,
658 error = %err,
659 "merge re-embed failed; Qdrant entry may be stale"
660 );
661 }
662 Err(_) => {
663 tracing::warn!(
664 entity_id,
665 "merge re-embed timed out; Qdrant entry may be stale"
666 );
667 }
668 }
669
670 Ok(())
671 }
672
673 #[allow(clippy::too_many_arguments)] async fn store_entity_embedding(
682 &self,
683 emb_store: &EmbeddingStore,
684 entity_id: i64,
685 existing_point_id: Option<&str>,
686 name: &str,
687 entity_type: EntityType,
688 summary: &str,
689 vector: &[f32],
690 ) {
691 let vector_size = u64::try_from(vector.len()).unwrap_or(384);
697 let collection_ensured = Arc::clone(&self.collection_ensured);
698 if let Err(err) = collection_ensured
699 .get_or_try_init(|| async {
700 emb_store
701 .ensure_named_collection(ENTITY_COLLECTION, vector_size)
702 .await
703 })
704 .await
705 {
706 tracing::error!(
707 error = %err,
708 "failed to ensure entity embedding collection; skipping Qdrant upsert"
709 );
710 return;
711 }
712
713 let payload = serde_json::json!({
714 "entity_id": entity_id,
715 "entity_id_str": entity_id.to_string(),
719 "canonical_name": name,
720 "name": name,
721 "entity_type": entity_type.as_str(),
722 "summary": summary,
723 });
724
725 if let Some(point_id) = existing_point_id {
726 if let Err(err) = emb_store
728 .upsert_to_collection(ENTITY_COLLECTION, point_id, payload, vector.to_vec())
729 .await
730 {
731 tracing::warn!(
732 entity_id,
733 error = %err,
734 "Qdrant upsert (existing point) failed; Qdrant entry may be stale"
735 );
736 }
737 } else {
738 match emb_store
739 .store_to_collection(ENTITY_COLLECTION, payload, vector.to_vec())
740 .await
741 {
742 Ok(point_id) => {
743 if let Err(err) = self
744 .store
745 .set_entity_qdrant_point_id(entity_id, &point_id)
746 .await
747 {
748 tracing::warn!(
749 entity_id,
750 error = %err,
751 "failed to store qdrant_point_id in SQLite"
752 );
753 }
754 }
755 Err(err) => {
756 tracing::warn!(
757 entity_id,
758 error = %err,
759 "Qdrant upsert failed; entity created in SQLite, qdrant_point_id remains NULL"
760 );
761 }
762 }
763 }
764 }
765
766 #[allow(clippy::too_many_arguments)] async fn llm_disambiguate(
771 &self,
772 provider: &AnyProvider,
773 new_name: &str,
774 new_type: &str,
775 new_summary: &str,
776 existing_name: &str,
777 existing_type: &str,
778 existing_summary: &str,
779 score: f32,
780 ) -> Option<bool> {
781 let prompt = format!(
782 "New entity:\n\
783 - Name: <external-data>{new_name}</external-data>\n\
784 - Type: <external-data>{new_type}</external-data>\n\
785 - Summary: <external-data>{new_summary}</external-data>\n\
786 \n\
787 Existing entity:\n\
788 - Name: <external-data>{existing_name}</external-data>\n\
789 - Type: <external-data>{existing_type}</external-data>\n\
790 - Summary: <external-data>{existing_summary}</external-data>\n\
791 \n\
792 Cosine similarity: {score:.2}\n\
793 \n\
794 Are these the same entity? Respond with JSON: {{\"same_entity\": true}} or {{\"same_entity\": false}}"
795 );
796
797 let messages = [
798 Message::from_legacy(
799 Role::System,
800 "You are an entity disambiguation assistant. Given a new entity mention and \
801 an existing entity from the knowledge graph, determine if they refer to the same \
802 real-world entity. Respond only with JSON.",
803 ),
804 Message::from_legacy(Role::User, prompt),
805 ];
806
807 let response = match provider.chat(&messages).await {
808 Ok(r) => r,
809 Err(err) => {
810 tracing::warn!(error = %err, "LLM disambiguation chat failed");
811 return None;
812 }
813 };
814
815 let json_str = extract_json(&response);
817 match serde_json::from_str::<DisambiguationResponse>(json_str) {
818 Ok(parsed) => Some(parsed.same_entity),
819 Err(err) => {
820 tracing::warn!(error = %err, response = %response, "failed to parse LLM disambiguation response");
821 None
822 }
823 }
824 }
825
826 pub async fn resolve_batch(
839 &self,
840 entities: &[ExtractedEntity],
841 ) -> Result<Vec<(i64, ResolutionOutcome)>, MemoryError> {
842 if entities.is_empty() {
843 return Ok(Vec::new());
844 }
845
846 let mut results: Vec<Option<(i64, ResolutionOutcome)>> = vec![None; entities.len()];
848
849 let mut stream = stream::iter(entities.iter().enumerate().map(|(i, e)| {
850 let name = e.name.clone();
851 let entity_type = e.entity_type.clone();
852 let summary = e.summary.clone();
853 async move {
854 let result = self.resolve(&name, &entity_type, summary.as_deref()).await;
855 (i, result)
856 }
857 }))
858 .buffer_unordered(4);
859
860 while let Some((i, result)) = stream.next().await {
861 match result {
862 Ok(outcome) => results[i] = Some(outcome),
863 Err(err) => return Err(err),
864 }
865 }
866
867 Ok(results
868 .into_iter()
869 .enumerate()
870 .map(|(i, r)| {
871 r.unwrap_or_else(|| {
872 tracing::warn!(
873 index = i,
874 "resolve_batch: missing result at index — bug in stream collection"
875 );
876 panic!("resolve_batch: missing result at index {i}")
877 })
878 })
879 .collect())
880 }
881
882 pub async fn resolve_edge(
896 &self,
897 source_id: i64,
898 target_id: i64,
899 relation: &str,
900 fact: &str,
901 confidence: f32,
902 episode_id: Option<MessageId>,
903 ) -> Result<Option<i64>, MemoryError> {
904 let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
905 let normalized_relation =
906 truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
907
908 let fact_clean = strip_control_chars(fact.trim());
909 let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
910
911 let existing_edges = self.store.edges_exact(source_id, target_id).await?;
913
914 let matching = existing_edges
915 .iter()
916 .find(|e| e.relation == normalized_relation);
917
918 if let Some(old) = matching {
919 if old.fact == normalized_fact {
920 return Ok(None);
922 }
923 self.store.invalidate_edge(old.id).await?;
925 }
926
927 let new_id = self
928 .store
929 .insert_edge(
930 source_id,
931 target_id,
932 &normalized_relation,
933 &normalized_fact,
934 confidence,
935 episode_id,
936 )
937 .await?;
938 Ok(Some(new_id))
939 }
940
941 #[allow(clippy::too_many_arguments)] pub async fn resolve_edge_typed(
959 &self,
960 source_id: i64,
961 target_id: i64,
962 relation: &str,
963 fact: &str,
964 confidence: f32,
965 episode_id: Option<crate::types::MessageId>,
966 edge_type: crate::graph::EdgeType,
967 belief_revision: Option<&crate::graph::BeliefRevisionConfig>,
968 ) -> Result<Option<i64>, MemoryError> {
969 let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
970 let normalized_relation =
971 truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
972
973 let fact_clean = strip_control_chars(fact.trim());
974 let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
975
976 let existing_edges = self.store.edges_exact(source_id, target_id).await?;
977
978 let matching = existing_edges
980 .iter()
981 .find(|e| e.relation == normalized_relation && e.edge_type == edge_type);
982
983 if matching.is_some_and(|old| old.fact == normalized_fact) {
984 return Ok(None);
985 }
986
987 let superseded_ids: Vec<i64> = if let (Some(cfg), Some(provider)) =
989 (belief_revision, self.provider)
990 {
991 match tokio::time::timeout(self.embed_timeout, provider.embed(&normalized_fact)).await {
993 Ok(Ok(new_emb)) => {
994 match crate::graph::belief_revision::find_superseded_edges(
995 &existing_edges,
996 &new_emb,
997 &normalized_relation,
998 edge_type,
999 provider,
1000 cfg,
1001 self.embed_timeout,
1002 )
1003 .await
1004 {
1005 Ok(ids) => ids,
1006 Err(err) => {
1007 tracing::warn!(error = %err,
1008 "belief_revision: find_superseded_edges failed, falling back to exact match");
1009 matching.map(|e| vec![e.id]).unwrap_or_default()
1010 }
1011 }
1012 }
1013 Ok(Err(err)) => {
1014 tracing::warn!(error = %err,
1015 "belief_revision: embed new fact failed, falling back to exact match");
1016 matching.map(|e| vec![e.id]).unwrap_or_default()
1017 }
1018 Err(_) => {
1019 tracing::warn!(
1020 "belief_revision: embed new fact timed out, falling back to exact match"
1021 );
1022 matching.map(|e| vec![e.id]).unwrap_or_default()
1023 }
1024 }
1025 } else {
1026 matching.map(|e| vec![e.id]).unwrap_or_default()
1028 };
1029
1030 let new_id = self
1031 .store
1032 .insert_edge_typed(
1033 source_id,
1034 target_id,
1035 &normalized_relation,
1036 &normalized_fact,
1037 confidence,
1038 episode_id,
1039 edge_type,
1040 )
1041 .await?;
1042
1043 for old_id in superseded_ids {
1045 if belief_revision.is_some() {
1046 self.store
1047 .invalidate_edge_with_supersession(old_id, new_id)
1048 .await?;
1049 } else {
1050 self.store.invalidate_edge(old_id).await?;
1051 }
1052 }
1053
1054 Ok(Some(new_id))
1055 }
1056}
1057
1058fn extract_json(s: &str) -> &str {
1060 let trimmed = s.trim();
1061 if let Some(inner) = trimmed.strip_prefix("```json")
1063 && let Some(end) = inner.rfind("```")
1064 {
1065 return inner[..end].trim();
1066 }
1067 if let Some(inner) = trimmed.strip_prefix("```")
1068 && let Some(end) = inner.rfind("```")
1069 {
1070 return inner[..end].trim();
1071 }
1072 if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
1074 && start <= end
1075 {
1076 return &trimmed[start..=end];
1077 }
1078 trimmed
1079}
1080
1081#[cfg(test)]
1082mod tests;