1use std::sync::Arc;
5
6use dashmap::DashMap;
7use futures::stream::{self, StreamExt as _};
8use schemars::JsonSchema;
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use zeph_common::sanitize::strip_control_chars;
12use zeph_common::text::truncate_to_bytes_ref;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{LlmProvider as _, Message, Role};
15
16use super::store::GraphStore;
17use super::types::EntityType;
18use crate::embedding_store::EmbeddingStore;
19use crate::error::MemoryError;
20use crate::graph::extractor::ExtractedEntity;
21use crate::types::MessageId;
22use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
23
24const MIN_ENTITY_NAME_BYTES: usize = 3;
26const MAX_ENTITY_NAME_BYTES: usize = 512;
28const MAX_RELATION_BYTES: usize = 256;
30const MAX_FACT_BYTES: usize = 2048;
32
33const ENTITY_COLLECTION: &str = "zeph_graph_entities";
35
36const EMBED_TIMEOUT_SECS: u64 = 30;
38
39#[derive(Debug, Clone, PartialEq)]
41pub enum ResolutionOutcome {
42 ExactMatch,
44 EmbeddingMatch { score: f32 },
46 LlmDisambiguated,
48 Created,
50}
51
52#[derive(Debug, Deserialize, JsonSchema)]
54struct DisambiguationResponse {
55 same_entity: bool,
56}
57
58type NameLockMap = Arc<DashMap<String, Arc<Mutex<()>>>>;
67
68pub struct EntityResolver<'a> {
69 store: &'a GraphStore,
70 embedding_store: Option<&'a Arc<EmbeddingStore>>,
71 provider: Option<&'a AnyProvider>,
72 similarity_threshold: f32,
73 ambiguous_threshold: f32,
74 name_locks: NameLockMap,
75 fallback_count: Arc<std::sync::atomic::AtomicU64>,
77 collection_ensured: Arc<tokio::sync::OnceCell<()>>,
81}
82
83impl<'a> EntityResolver<'a> {
84 #[must_use]
85 pub fn new(store: &'a GraphStore) -> Self {
86 Self {
87 store,
88 embedding_store: None,
89 provider: None,
90 similarity_threshold: 0.85,
91 ambiguous_threshold: 0.70,
92 name_locks: Arc::new(DashMap::new()),
93 fallback_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
94 collection_ensured: Arc::new(tokio::sync::OnceCell::new()),
95 }
96 }
97
98 #[must_use]
99 pub fn with_embedding_store(mut self, store: &'a Arc<EmbeddingStore>) -> Self {
100 self.embedding_store = Some(store);
101 self
102 }
103
104 #[must_use]
105 pub fn with_provider(mut self, provider: &'a AnyProvider) -> Self {
106 self.provider = Some(provider);
107 self
108 }
109
110 #[must_use]
111 pub fn with_thresholds(mut self, similarity: f32, ambiguous: f32) -> Self {
112 self.similarity_threshold = similarity;
113 self.ambiguous_threshold = ambiguous;
114 self
115 }
116
117 #[must_use]
119 pub fn fallback_count(&self) -> Arc<std::sync::atomic::AtomicU64> {
120 Arc::clone(&self.fallback_count)
121 }
122
123 fn normalize_name(name: &str) -> String {
125 let lowered = name.trim().to_lowercase();
126 let cleaned = strip_control_chars(&lowered);
127 let normalized = truncate_to_bytes_ref(&cleaned, MAX_ENTITY_NAME_BYTES).to_owned();
128 if normalized.len() < cleaned.len() {
129 tracing::debug!(
130 "graph resolver: entity name truncated to {} bytes",
131 MAX_ENTITY_NAME_BYTES
132 );
133 }
134 normalized
135 }
136
137 fn parse_entity_type(entity_type: &str) -> EntityType {
139 entity_type
140 .trim()
141 .to_lowercase()
142 .parse::<EntityType>()
143 .unwrap_or_else(|_| {
144 tracing::debug!(
145 "graph resolver: unknown entity type {:?}, falling back to Concept",
146 entity_type
147 );
148 EntityType::Concept
149 })
150 }
151
152 async fn lock_name(&self, normalized: &str) -> tokio::sync::OwnedMutexGuard<()> {
154 let lock = self
155 .name_locks
156 .entry(normalized.to_owned())
157 .or_insert_with(|| Arc::new(Mutex::new(())))
158 .clone();
159 lock.lock_owned().await
160 }
161
162 pub async fn resolve(
181 &self,
182 name: &str,
183 entity_type: &str,
184 summary: Option<&str>,
185 ) -> Result<(i64, ResolutionOutcome), MemoryError> {
186 let normalized = Self::normalize_name(name);
187
188 if normalized.is_empty() {
189 return Err(MemoryError::GraphStore("empty entity name".into()));
190 }
191
192 if normalized.len() < MIN_ENTITY_NAME_BYTES {
193 return Err(MemoryError::GraphStore(format!(
194 "entity name too short: {normalized:?} ({} bytes, min {MIN_ENTITY_NAME_BYTES})",
195 normalized.len()
196 )));
197 }
198
199 let et = Self::parse_entity_type(entity_type);
200
201 let surface_name = name.trim().to_owned();
203
204 let _guard = self.lock_name(&normalized).await;
206
207 if let Some(entity) = self.store.find_entity_by_alias(&normalized, et).await? {
209 self.store
210 .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
211 .await?;
212 return Ok((entity.id, ResolutionOutcome::ExactMatch));
213 }
214
215 if let Some(entity) = self.store.find_entity(&normalized, et).await? {
217 self.store
218 .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
219 .await?;
220 return Ok((entity.id, ResolutionOutcome::ExactMatch));
221 }
222
223 if let Some(outcome) = self
225 .resolve_via_embedding(&normalized, name, &surface_name, et, summary)
226 .await?
227 {
228 return Ok(outcome);
229 }
230
231 let entity_id = self
233 .store
234 .upsert_entity(&surface_name, &normalized, et, summary)
235 .await?;
236
237 self.register_aliases(entity_id, &normalized, name).await?;
238
239 Ok((entity_id, ResolutionOutcome::Created))
240 }
241
242 async fn embed_entity_text(
245 &self,
246 provider: &AnyProvider,
247 normalized: &str,
248 summary: Option<&str>,
249 ) -> Option<Vec<f32>> {
250 let safe_summary = truncate_to_bytes_ref(summary.unwrap_or(""), MAX_FACT_BYTES);
251 let embed_text = format!("{normalized}: {safe_summary}");
252 let embed_result = tokio::time::timeout(
253 std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
254 provider.embed(&embed_text),
255 )
256 .await;
257 match embed_result {
258 Ok(Ok(v)) => Some(v),
259 Ok(Err(err)) => {
260 self.fallback_count
261 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
262 tracing::warn!(entity_name = %normalized, error = %err,
263 "embed() failed; falling back to exact-match-only entity creation");
264 None
265 }
266 Err(_timeout) => {
267 self.fallback_count
268 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
269 tracing::warn!(entity_name = %normalized,
270 "embed() timed out after {}s; falling back to create new entity",
271 EMBED_TIMEOUT_SECS);
272 None
273 }
274 }
275 }
276
277 #[allow(clippy::too_many_arguments)] async fn handle_ambiguous_candidate(
281 &self,
282 emb_store: &EmbeddingStore,
283 provider: &AnyProvider,
284 payload: &std::collections::HashMap<String, serde_json::Value>,
285 point_id: &str,
286 score: f32,
287 surface_name: &str,
288 normalized: &str,
289 et: EntityType,
290 summary: Option<&str>,
291 ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
292 let entity_id = payload
293 .get("entity_id")
294 .and_then(serde_json::Value::as_i64)
295 .ok_or_else(|| MemoryError::GraphStore("missing entity_id in payload".into()))?;
296 let existing_name = payload
297 .get("name")
298 .and_then(|v| v.as_str())
299 .unwrap_or("")
300 .to_owned();
301 let existing_summary = payload
302 .get("summary")
303 .and_then(|v| v.as_str())
304 .unwrap_or("")
305 .to_owned();
306 let existing_type = payload
308 .get("entity_type")
309 .and_then(|v| v.as_str())
310 .unwrap_or(et.as_str())
311 .to_owned();
312 let existing_canonical = payload.get("canonical_name").and_then(|v| v.as_str());
313 let existing_summary_str = payload.get("summary").and_then(|v| v.as_str());
314 match self
315 .llm_disambiguate(
316 provider,
317 normalized,
318 et.as_str(),
319 summary.unwrap_or(""),
320 &existing_name,
321 &existing_type,
322 &existing_summary,
323 score,
324 )
325 .await
326 {
327 Some(true) => {
328 self.merge_entity(
329 emb_store,
330 provider,
331 entity_id,
332 surface_name,
333 normalized,
334 et,
335 summary,
336 existing_canonical,
337 existing_summary_str,
338 Some(point_id),
339 )
340 .await?;
341 Ok(Some((entity_id, ResolutionOutcome::LlmDisambiguated)))
342 }
343 Some(false) => Ok(None),
344 None => {
345 self.fallback_count
346 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
347 tracing::warn!(entity_name = %normalized,
348 "LLM disambiguation failed; falling back to create new entity");
349 Ok(None)
350 }
351 }
352 }
353
354 async fn resolve_via_embedding(
357 &self,
358 normalized: &str,
359 original_name: &str,
360 surface_name: &str,
361 et: EntityType,
362 summary: Option<&str>,
363 ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
364 let (Some(emb_store), Some(provider)) = (self.embedding_store, self.provider) else {
365 return Ok(None);
366 };
367
368 let Some(query_vec) = self.embed_entity_text(provider, normalized, summary).await else {
369 return Ok(None);
370 };
371
372 let type_filter = VectorFilter {
373 must: vec![FieldCondition {
374 field: "entity_type".into(),
375 value: FieldValue::Text(et.as_str().to_owned()),
376 }],
377 must_not: vec![],
378 };
379 let candidates = match emb_store
380 .search_collection(ENTITY_COLLECTION, &query_vec, 5, Some(type_filter))
381 .await
382 {
383 Ok(c) => c,
384 Err(err) => {
385 self.fallback_count
386 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
387 tracing::warn!(entity_name = %normalized, error = %err,
388 "Qdrant search failed; falling back to create new entity");
389 return self
390 .create_with_embedding(
391 emb_store,
392 surface_name,
393 normalized,
394 original_name,
395 et,
396 summary,
397 &query_vec,
398 )
399 .await
400 .map(Some);
401 }
402 };
403
404 if let Some(best) = candidates.first() {
405 let score = best.score;
406 if score >= self.similarity_threshold {
407 let entity_id = best
408 .payload
409 .get("entity_id")
410 .and_then(serde_json::Value::as_i64)
411 .ok_or_else(|| {
412 MemoryError::GraphStore("missing entity_id in payload".into())
413 })?;
414 let existing_canonical =
415 best.payload.get("canonical_name").and_then(|v| v.as_str());
416 let existing_summary = best.payload.get("summary").and_then(|v| v.as_str());
417 let existing_pid = Some(best.id.as_str());
418 self.merge_entity(
419 emb_store,
420 provider,
421 entity_id,
422 surface_name,
423 normalized,
424 et,
425 summary,
426 existing_canonical,
427 existing_summary,
428 existing_pid,
429 )
430 .await?;
431 return Ok(Some((
432 entity_id,
433 ResolutionOutcome::EmbeddingMatch { score },
434 )));
435 } else if score >= self.ambiguous_threshold
436 && let Some(result) = self
437 .handle_ambiguous_candidate(
438 emb_store,
439 provider,
440 &best.payload,
441 &best.id,
442 score,
443 surface_name,
444 normalized,
445 et,
446 summary,
447 )
448 .await?
449 {
450 return Ok(Some(result));
451 }
452 }
454
455 self.create_with_embedding(
457 emb_store,
458 surface_name,
459 normalized,
460 original_name,
461 et,
462 summary,
463 &query_vec,
464 )
465 .await
466 .map(Some)
467 }
468
469 #[allow(clippy::too_many_arguments)] async fn create_with_embedding(
472 &self,
473 emb_store: &EmbeddingStore,
474 surface_name: &str,
475 normalized: &str,
476 original_name: &str,
477 et: EntityType,
478 summary: Option<&str>,
479 query_vec: &[f32],
480 ) -> Result<(i64, ResolutionOutcome), MemoryError> {
481 let entity_id = self
482 .store
483 .upsert_entity(surface_name, normalized, et, summary)
484 .await?;
485 self.register_aliases(entity_id, normalized, original_name)
486 .await?;
487 self.store_entity_embedding(
488 emb_store,
489 entity_id,
490 None,
491 normalized,
492 et,
493 summary.unwrap_or(""),
494 query_vec,
495 )
496 .await;
497 Ok((entity_id, ResolutionOutcome::Created))
498 }
499
500 async fn register_aliases(
502 &self,
503 entity_id: i64,
504 normalized: &str,
505 original_name: &str,
506 ) -> Result<(), MemoryError> {
507 self.store.add_alias(entity_id, normalized).await?;
508
509 let original_trimmed = original_name.trim().to_lowercase();
512 let original_clean_str = strip_control_chars(&original_trimmed);
513 let original_clean = truncate_to_bytes_ref(&original_clean_str, MAX_ENTITY_NAME_BYTES);
514 if original_clean != normalized {
515 self.store.add_alias(entity_id, original_clean).await?;
516 }
517
518 Ok(())
519 }
520
521 #[allow(clippy::too_many_arguments)] async fn merge_entity(
529 &self,
530 emb_store: &EmbeddingStore,
531 provider: &AnyProvider,
532 entity_id: i64,
533 new_surface_name: &str,
534 new_canonical_name: &str,
535 entity_type: EntityType,
536 new_summary: Option<&str>,
537 existing_canonical_name: Option<&str>,
538 existing_summary_payload: Option<&str>,
539 existing_point_id: Option<&str>,
540 ) -> Result<(), MemoryError> {
541 let (existing_canonical, existing_summary, existing_point_id_owned) =
546 if existing_canonical_name.is_some() && existing_summary_payload.is_some() {
547 (
548 existing_canonical_name
549 .unwrap_or(new_canonical_name)
550 .to_owned(),
551 existing_summary_payload.unwrap_or("").to_owned(),
552 existing_point_id.map(ToOwned::to_owned),
553 )
554 } else {
555 let existing = self.store.find_entity_by_id(entity_id).await?;
560 let canonical = existing_canonical_name.map_or_else(
561 || {
562 existing.as_ref().map_or_else(
563 || new_canonical_name.to_owned(),
564 |e| e.canonical_name.clone(),
565 )
566 },
567 ToOwned::to_owned,
568 );
569 let summary = existing
570 .as_ref()
571 .and_then(|e| e.summary.as_deref())
572 .unwrap_or("")
573 .to_owned();
574 let pid = existing_point_id.map(ToOwned::to_owned).or_else(|| {
575 existing
576 .as_ref()
577 .and_then(|e| e.qdrant_point_id.as_deref())
578 .map(ToOwned::to_owned)
579 });
580 (canonical, summary, pid)
581 };
582
583 let merged_summary = if let Some(new) = new_summary {
584 if !new.is_empty() && !existing_summary.is_empty() {
585 let combined = format!("{existing_summary}; {new}");
586 truncate_to_bytes_ref(&combined, MAX_FACT_BYTES).to_owned()
588 } else if !new.is_empty() {
589 new.to_owned()
590 } else {
591 existing_summary.clone()
592 }
593 } else {
594 existing_summary.clone()
595 };
596
597 let summary_opt = if merged_summary.is_empty() {
598 None
599 } else {
600 Some(merged_summary.as_str())
601 };
602
603 self.store
606 .upsert_entity(
607 new_surface_name,
608 &existing_canonical,
609 entity_type,
610 summary_opt,
611 )
612 .await?;
613
614 let embed_text = format!("{new_surface_name}: {merged_summary}");
616 let embed_result = tokio::time::timeout(
617 std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
618 provider.embed(&embed_text),
619 )
620 .await;
621
622 match embed_result {
623 Ok(Ok(vec)) => {
624 self.store_entity_embedding(
625 emb_store,
626 entity_id,
627 existing_point_id_owned.as_deref(),
628 new_surface_name,
629 entity_type,
630 &merged_summary,
631 &vec,
632 )
633 .await;
634 }
635 Ok(Err(err)) => {
636 tracing::warn!(
637 entity_id,
638 error = %err,
639 "merge re-embed failed; Qdrant entry may be stale"
640 );
641 }
642 Err(_) => {
643 tracing::warn!(
644 entity_id,
645 "merge re-embed timed out; Qdrant entry may be stale"
646 );
647 }
648 }
649
650 Ok(())
651 }
652
653 #[allow(clippy::too_many_arguments)] async fn store_entity_embedding(
662 &self,
663 emb_store: &EmbeddingStore,
664 entity_id: i64,
665 existing_point_id: Option<&str>,
666 name: &str,
667 entity_type: EntityType,
668 summary: &str,
669 vector: &[f32],
670 ) {
671 let vector_size = u64::try_from(vector.len()).unwrap_or(384);
677 let collection_ensured = Arc::clone(&self.collection_ensured);
678 if let Err(err) = collection_ensured
679 .get_or_try_init(|| async {
680 emb_store
681 .ensure_named_collection(ENTITY_COLLECTION, vector_size)
682 .await
683 })
684 .await
685 {
686 tracing::error!(
687 error = %err,
688 "failed to ensure entity embedding collection; skipping Qdrant upsert"
689 );
690 return;
691 }
692
693 let payload = serde_json::json!({
694 "entity_id": entity_id,
695 "entity_id_str": entity_id.to_string(),
699 "canonical_name": name,
700 "name": name,
701 "entity_type": entity_type.as_str(),
702 "summary": summary,
703 });
704
705 if let Some(point_id) = existing_point_id {
706 if let Err(err) = emb_store
708 .upsert_to_collection(ENTITY_COLLECTION, point_id, payload, vector.to_vec())
709 .await
710 {
711 tracing::warn!(
712 entity_id,
713 error = %err,
714 "Qdrant upsert (existing point) failed; Qdrant entry may be stale"
715 );
716 }
717 } else {
718 match emb_store
719 .store_to_collection(ENTITY_COLLECTION, payload, vector.to_vec())
720 .await
721 {
722 Ok(point_id) => {
723 if let Err(err) = self
724 .store
725 .set_entity_qdrant_point_id(entity_id, &point_id)
726 .await
727 {
728 tracing::warn!(
729 entity_id,
730 error = %err,
731 "failed to store qdrant_point_id in SQLite"
732 );
733 }
734 }
735 Err(err) => {
736 tracing::warn!(
737 entity_id,
738 error = %err,
739 "Qdrant upsert failed; entity created in SQLite, qdrant_point_id remains NULL"
740 );
741 }
742 }
743 }
744 }
745
746 #[allow(clippy::too_many_arguments)] async fn llm_disambiguate(
751 &self,
752 provider: &AnyProvider,
753 new_name: &str,
754 new_type: &str,
755 new_summary: &str,
756 existing_name: &str,
757 existing_type: &str,
758 existing_summary: &str,
759 score: f32,
760 ) -> Option<bool> {
761 let prompt = format!(
762 "New entity:\n\
763 - Name: <external-data>{new_name}</external-data>\n\
764 - Type: <external-data>{new_type}</external-data>\n\
765 - Summary: <external-data>{new_summary}</external-data>\n\
766 \n\
767 Existing entity:\n\
768 - Name: <external-data>{existing_name}</external-data>\n\
769 - Type: <external-data>{existing_type}</external-data>\n\
770 - Summary: <external-data>{existing_summary}</external-data>\n\
771 \n\
772 Cosine similarity: {score:.2}\n\
773 \n\
774 Are these the same entity? Respond with JSON: {{\"same_entity\": true}} or {{\"same_entity\": false}}"
775 );
776
777 let messages = [
778 Message::from_legacy(
779 Role::System,
780 "You are an entity disambiguation assistant. Given a new entity mention and \
781 an existing entity from the knowledge graph, determine if they refer to the same \
782 real-world entity. Respond only with JSON.",
783 ),
784 Message::from_legacy(Role::User, prompt),
785 ];
786
787 let response = match provider.chat(&messages).await {
788 Ok(r) => r,
789 Err(err) => {
790 tracing::warn!(error = %err, "LLM disambiguation chat failed");
791 return None;
792 }
793 };
794
795 let json_str = extract_json(&response);
797 match serde_json::from_str::<DisambiguationResponse>(json_str) {
798 Ok(parsed) => Some(parsed.same_entity),
799 Err(err) => {
800 tracing::warn!(error = %err, response = %response, "failed to parse LLM disambiguation response");
801 None
802 }
803 }
804 }
805
806 pub async fn resolve_batch(
819 &self,
820 entities: &[ExtractedEntity],
821 ) -> Result<Vec<(i64, ResolutionOutcome)>, MemoryError> {
822 if entities.is_empty() {
823 return Ok(Vec::new());
824 }
825
826 let mut results: Vec<Option<(i64, ResolutionOutcome)>> = vec![None; entities.len()];
828
829 let mut stream = stream::iter(entities.iter().enumerate().map(|(i, e)| {
830 let name = e.name.clone();
831 let entity_type = e.entity_type.clone();
832 let summary = e.summary.clone();
833 async move {
834 let result = self.resolve(&name, &entity_type, summary.as_deref()).await;
835 (i, result)
836 }
837 }))
838 .buffer_unordered(4);
839
840 while let Some((i, result)) = stream.next().await {
841 match result {
842 Ok(outcome) => results[i] = Some(outcome),
843 Err(err) => return Err(err),
844 }
845 }
846
847 Ok(results
848 .into_iter()
849 .enumerate()
850 .map(|(i, r)| {
851 r.unwrap_or_else(|| {
852 tracing::warn!(
853 index = i,
854 "resolve_batch: missing result at index — bug in stream collection"
855 );
856 panic!("resolve_batch: missing result at index {i}")
857 })
858 })
859 .collect())
860 }
861
862 pub async fn resolve_edge(
876 &self,
877 source_id: i64,
878 target_id: i64,
879 relation: &str,
880 fact: &str,
881 confidence: f32,
882 episode_id: Option<MessageId>,
883 ) -> Result<Option<i64>, MemoryError> {
884 let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
885 let normalized_relation =
886 truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
887
888 let fact_clean = strip_control_chars(fact.trim());
889 let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
890
891 let existing_edges = self.store.edges_exact(source_id, target_id).await?;
893
894 let matching = existing_edges
895 .iter()
896 .find(|e| e.relation == normalized_relation);
897
898 if let Some(old) = matching {
899 if old.fact == normalized_fact {
900 return Ok(None);
902 }
903 self.store.invalidate_edge(old.id).await?;
905 }
906
907 let new_id = self
908 .store
909 .insert_edge(
910 source_id,
911 target_id,
912 &normalized_relation,
913 &normalized_fact,
914 confidence,
915 episode_id,
916 )
917 .await?;
918 Ok(Some(new_id))
919 }
920
921 #[allow(clippy::too_many_arguments)] pub async fn resolve_edge_typed(
939 &self,
940 source_id: i64,
941 target_id: i64,
942 relation: &str,
943 fact: &str,
944 confidence: f32,
945 episode_id: Option<crate::types::MessageId>,
946 edge_type: crate::graph::EdgeType,
947 belief_revision: Option<&crate::graph::BeliefRevisionConfig>,
948 ) -> Result<Option<i64>, MemoryError> {
949 let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
950 let normalized_relation =
951 truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
952
953 let fact_clean = strip_control_chars(fact.trim());
954 let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
955
956 let existing_edges = self.store.edges_exact(source_id, target_id).await?;
957
958 let matching = existing_edges
960 .iter()
961 .find(|e| e.relation == normalized_relation && e.edge_type == edge_type);
962
963 if matching.is_some_and(|old| old.fact == normalized_fact) {
964 return Ok(None);
965 }
966
967 let superseded_ids: Vec<i64> = if let (Some(cfg), Some(provider)) =
969 (belief_revision, self.provider)
970 {
971 match tokio::time::timeout(
973 std::time::Duration::from_secs(5),
974 provider.embed(&normalized_fact),
975 )
976 .await
977 {
978 Ok(Ok(new_emb)) => {
979 match crate::graph::belief_revision::find_superseded_edges(
980 &existing_edges,
981 &new_emb,
982 &normalized_relation,
983 edge_type,
984 provider,
985 cfg,
986 )
987 .await
988 {
989 Ok(ids) => ids,
990 Err(err) => {
991 tracing::warn!(error = %err,
992 "belief_revision: find_superseded_edges failed, falling back to exact match");
993 matching.map(|e| vec![e.id]).unwrap_or_default()
994 }
995 }
996 }
997 Ok(Err(err)) => {
998 tracing::warn!(error = %err,
999 "belief_revision: embed new fact failed, falling back to exact match");
1000 matching.map(|e| vec![e.id]).unwrap_or_default()
1001 }
1002 Err(_) => {
1003 tracing::warn!(
1004 "belief_revision: embed new fact timed out, falling back to exact match"
1005 );
1006 matching.map(|e| vec![e.id]).unwrap_or_default()
1007 }
1008 }
1009 } else {
1010 matching.map(|e| vec![e.id]).unwrap_or_default()
1012 };
1013
1014 let new_id = self
1015 .store
1016 .insert_edge_typed(
1017 source_id,
1018 target_id,
1019 &normalized_relation,
1020 &normalized_fact,
1021 confidence,
1022 episode_id,
1023 edge_type,
1024 )
1025 .await?;
1026
1027 for old_id in superseded_ids {
1029 if belief_revision.is_some() {
1030 self.store
1031 .invalidate_edge_with_supersession(old_id, new_id)
1032 .await?;
1033 } else {
1034 self.store.invalidate_edge(old_id).await?;
1035 }
1036 }
1037
1038 Ok(Some(new_id))
1039 }
1040}
1041
1042fn extract_json(s: &str) -> &str {
1044 let trimmed = s.trim();
1045 if let Some(inner) = trimmed.strip_prefix("```json")
1047 && let Some(end) = inner.rfind("```")
1048 {
1049 return inner[..end].trim();
1050 }
1051 if let Some(inner) = trimmed.strip_prefix("```")
1052 && let Some(end) = inner.rfind("```")
1053 {
1054 return inner[..end].trim();
1055 }
1056 if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
1058 && start <= end
1059 {
1060 return &trimmed[start..=end];
1061 }
1062 trimmed
1063}
1064
1065#[cfg(test)]
1066mod tests;