1use std::sync::Arc;
5
6use dashmap::DashMap;
7use futures::stream::{self, StreamExt as _};
8use schemars::JsonSchema;
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use zeph_common::sanitize::strip_control_chars;
12use zeph_common::text::truncate_to_bytes_ref;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{LlmProvider as _, Message, Role};
15
16use super::store::GraphStore;
17use super::types::EntityType;
18use crate::embedding_store::EmbeddingStore;
19use crate::error::MemoryError;
20use crate::graph::extractor::ExtractedEntity;
21use crate::types::MessageId;
22use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
23
24const MIN_ENTITY_NAME_BYTES: usize = 3;
26const MAX_ENTITY_NAME_BYTES: usize = 512;
28const MAX_RELATION_BYTES: usize = 256;
30const MAX_FACT_BYTES: usize = 2048;
32
33const ENTITY_COLLECTION: &str = "zeph_graph_entities";
35
36const EMBED_TIMEOUT_SECS: u64 = 30;
38
39#[derive(Debug, Clone, PartialEq)]
41pub enum ResolutionOutcome {
42 ExactMatch,
44 EmbeddingMatch { score: f32 },
46 LlmDisambiguated,
48 Created,
50}
51
52#[derive(Debug, Deserialize, JsonSchema)]
54struct DisambiguationResponse {
55 same_entity: bool,
56}
57
58type NameLockMap = Arc<DashMap<String, Arc<Mutex<()>>>>;
67
68pub struct EntityResolver<'a> {
69 store: &'a GraphStore,
70 embedding_store: Option<&'a Arc<EmbeddingStore>>,
71 provider: Option<&'a AnyProvider>,
72 similarity_threshold: f32,
73 ambiguous_threshold: f32,
74 name_locks: NameLockMap,
75 fallback_count: Arc<std::sync::atomic::AtomicU64>,
77 collection_ensured: Arc<tokio::sync::OnceCell<()>>,
81}
82
83impl<'a> EntityResolver<'a> {
84 #[must_use]
86 pub fn graph_store(&self) -> &GraphStore {
87 self.store
88 }
89
90 #[must_use]
91 pub fn new(store: &'a GraphStore) -> Self {
92 Self {
93 store,
94 embedding_store: None,
95 provider: None,
96 similarity_threshold: 0.85,
97 ambiguous_threshold: 0.70,
98 name_locks: Arc::new(DashMap::new()),
99 fallback_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
100 collection_ensured: Arc::new(tokio::sync::OnceCell::new()),
101 }
102 }
103
104 #[must_use]
105 pub fn with_embedding_store(mut self, store: &'a Arc<EmbeddingStore>) -> Self {
106 self.embedding_store = Some(store);
107 self
108 }
109
110 #[must_use]
111 pub fn with_provider(mut self, provider: &'a AnyProvider) -> Self {
112 self.provider = Some(provider);
113 self
114 }
115
116 #[must_use]
117 pub fn with_thresholds(mut self, similarity: f32, ambiguous: f32) -> Self {
118 self.similarity_threshold = similarity;
119 self.ambiguous_threshold = ambiguous;
120 self
121 }
122
123 #[must_use]
125 pub fn fallback_count(&self) -> Arc<std::sync::atomic::AtomicU64> {
126 Arc::clone(&self.fallback_count)
127 }
128
129 fn normalize_name(name: &str) -> String {
131 let lowered = name.trim().to_lowercase();
132 let cleaned = strip_control_chars(&lowered);
133 let normalized = truncate_to_bytes_ref(&cleaned, MAX_ENTITY_NAME_BYTES).to_owned();
134 if normalized.len() < cleaned.len() {
135 tracing::debug!(
136 "graph resolver: entity name truncated to {} bytes",
137 MAX_ENTITY_NAME_BYTES
138 );
139 }
140 normalized
141 }
142
143 fn parse_entity_type(entity_type: &str) -> EntityType {
145 entity_type
146 .trim()
147 .to_lowercase()
148 .parse::<EntityType>()
149 .unwrap_or_else(|_| {
150 tracing::debug!(
151 "graph resolver: unknown entity type {:?}, falling back to Concept",
152 entity_type
153 );
154 EntityType::Concept
155 })
156 }
157
158 async fn lock_name(&self, normalized: &str) -> tokio::sync::OwnedMutexGuard<()> {
160 let lock = self
161 .name_locks
162 .entry(normalized.to_owned())
163 .or_insert_with(|| Arc::new(Mutex::new(())))
164 .clone();
165 lock.lock_owned().await
166 }
167
168 pub async fn resolve(
187 &self,
188 name: &str,
189 entity_type: &str,
190 summary: Option<&str>,
191 ) -> Result<(i64, ResolutionOutcome), MemoryError> {
192 let normalized = Self::normalize_name(name);
193
194 if normalized.is_empty() {
195 return Err(MemoryError::GraphStore("empty entity name".into()));
196 }
197
198 if normalized.len() < MIN_ENTITY_NAME_BYTES {
199 return Err(MemoryError::GraphStore(format!(
200 "entity name too short: {normalized:?} ({} bytes, min {MIN_ENTITY_NAME_BYTES})",
201 normalized.len()
202 )));
203 }
204
205 let et = Self::parse_entity_type(entity_type);
206
207 let surface_name = name.trim().to_owned();
209
210 let _guard = self.lock_name(&normalized).await;
212
213 if let Some(entity) = self.store.find_entity_by_alias(&normalized, et).await? {
215 self.store
216 .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
217 .await?;
218 return Ok((entity.id.0, ResolutionOutcome::ExactMatch));
219 }
220
221 if let Some(entity) = self.store.find_entity(&normalized, et).await? {
223 self.store
224 .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
225 .await?;
226 return Ok((entity.id.0, ResolutionOutcome::ExactMatch));
227 }
228
229 if let Some(outcome) = self
231 .resolve_via_embedding(&normalized, name, &surface_name, et, summary)
232 .await?
233 {
234 return Ok(outcome);
235 }
236
237 let entity_id = self
239 .store
240 .upsert_entity(&surface_name, &normalized, et, summary)
241 .await?;
242
243 self.register_aliases(entity_id.0, &normalized, name)
244 .await?;
245
246 Ok((entity_id.0, ResolutionOutcome::Created))
247 }
248
249 async fn embed_entity_text(
252 &self,
253 provider: &AnyProvider,
254 normalized: &str,
255 summary: Option<&str>,
256 ) -> Option<Vec<f32>> {
257 let safe_summary = truncate_to_bytes_ref(summary.unwrap_or(""), MAX_FACT_BYTES);
258 let embed_text = format!("{normalized}: {safe_summary}");
259 let embed_result = tokio::time::timeout(
260 std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
261 provider.embed(&embed_text),
262 )
263 .await;
264 match embed_result {
265 Ok(Ok(v)) => Some(v),
266 Ok(Err(err)) => {
267 self.fallback_count
268 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
269 tracing::warn!(entity_name = %normalized, error = %err,
270 "embed() failed; falling back to exact-match-only entity creation");
271 None
272 }
273 Err(_timeout) => {
274 self.fallback_count
275 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
276 tracing::warn!(entity_name = %normalized,
277 "embed() timed out after {}s; falling back to create new entity",
278 EMBED_TIMEOUT_SECS);
279 None
280 }
281 }
282 }
283
284 #[allow(clippy::too_many_arguments)] async fn handle_ambiguous_candidate(
288 &self,
289 emb_store: &EmbeddingStore,
290 provider: &AnyProvider,
291 payload: &std::collections::HashMap<String, serde_json::Value>,
292 point_id: &str,
293 score: f32,
294 surface_name: &str,
295 normalized: &str,
296 et: EntityType,
297 summary: Option<&str>,
298 ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
299 let entity_id = payload
300 .get("entity_id")
301 .and_then(serde_json::Value::as_i64)
302 .ok_or_else(|| MemoryError::GraphStore("missing entity_id in payload".into()))?;
303 let existing_name = payload
304 .get("name")
305 .and_then(|v| v.as_str())
306 .unwrap_or("")
307 .to_owned();
308 let existing_summary = payload
309 .get("summary")
310 .and_then(|v| v.as_str())
311 .unwrap_or("")
312 .to_owned();
313 let existing_type = payload
315 .get("entity_type")
316 .and_then(|v| v.as_str())
317 .unwrap_or(et.as_str())
318 .to_owned();
319 let existing_canonical = payload.get("canonical_name").and_then(|v| v.as_str());
320 let existing_summary_str = payload.get("summary").and_then(|v| v.as_str());
321 match self
322 .llm_disambiguate(
323 provider,
324 normalized,
325 et.as_str(),
326 summary.unwrap_or(""),
327 &existing_name,
328 &existing_type,
329 &existing_summary,
330 score,
331 )
332 .await
333 {
334 Some(true) => {
335 self.merge_entity(
336 emb_store,
337 provider,
338 entity_id,
339 surface_name,
340 normalized,
341 et,
342 summary,
343 existing_canonical,
344 existing_summary_str,
345 Some(point_id),
346 )
347 .await?;
348 Ok(Some((entity_id, ResolutionOutcome::LlmDisambiguated)))
349 }
350 Some(false) => Ok(None),
351 None => {
352 self.fallback_count
353 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
354 tracing::warn!(entity_name = %normalized,
355 "LLM disambiguation failed; falling back to create new entity");
356 Ok(None)
357 }
358 }
359 }
360
361 async fn resolve_via_embedding(
364 &self,
365 normalized: &str,
366 original_name: &str,
367 surface_name: &str,
368 et: EntityType,
369 summary: Option<&str>,
370 ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
371 let (Some(emb_store), Some(provider)) = (self.embedding_store, self.provider) else {
372 return Ok(None);
373 };
374
375 let Some(query_vec) = self.embed_entity_text(provider, normalized, summary).await else {
376 return Ok(None);
377 };
378
379 let type_filter = VectorFilter {
380 must: vec![FieldCondition {
381 field: "entity_type".into(),
382 value: FieldValue::Text(et.as_str().to_owned()),
383 }],
384 must_not: vec![],
385 };
386 let candidates = match emb_store
387 .search_collection(ENTITY_COLLECTION, &query_vec, 5, Some(type_filter))
388 .await
389 {
390 Ok(c) => c,
391 Err(err) => {
392 self.fallback_count
393 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
394 tracing::warn!(entity_name = %normalized, error = %err,
395 "Qdrant search failed; falling back to create new entity");
396 return self
397 .create_with_embedding(
398 emb_store,
399 surface_name,
400 normalized,
401 original_name,
402 et,
403 summary,
404 &query_vec,
405 )
406 .await
407 .map(Some);
408 }
409 };
410
411 if let Some(best) = candidates.first() {
412 let score = best.score;
413 if score >= self.similarity_threshold {
414 let entity_id = best
415 .payload
416 .get("entity_id")
417 .and_then(serde_json::Value::as_i64)
418 .ok_or_else(|| {
419 MemoryError::GraphStore("missing entity_id in payload".into())
420 })?;
421 let existing_canonical =
422 best.payload.get("canonical_name").and_then(|v| v.as_str());
423 let existing_summary = best.payload.get("summary").and_then(|v| v.as_str());
424 let existing_pid = Some(best.id.as_str());
425 self.merge_entity(
426 emb_store,
427 provider,
428 entity_id,
429 surface_name,
430 normalized,
431 et,
432 summary,
433 existing_canonical,
434 existing_summary,
435 existing_pid,
436 )
437 .await?;
438 return Ok(Some((
439 entity_id,
440 ResolutionOutcome::EmbeddingMatch { score },
441 )));
442 } else if score >= self.ambiguous_threshold
443 && let Some(result) = self
444 .handle_ambiguous_candidate(
445 emb_store,
446 provider,
447 &best.payload,
448 &best.id,
449 score,
450 surface_name,
451 normalized,
452 et,
453 summary,
454 )
455 .await?
456 {
457 return Ok(Some(result));
458 }
459 }
461
462 self.create_with_embedding(
464 emb_store,
465 surface_name,
466 normalized,
467 original_name,
468 et,
469 summary,
470 &query_vec,
471 )
472 .await
473 .map(Some)
474 }
475
476 #[allow(clippy::too_many_arguments)] async fn create_with_embedding(
479 &self,
480 emb_store: &EmbeddingStore,
481 surface_name: &str,
482 normalized: &str,
483 original_name: &str,
484 et: EntityType,
485 summary: Option<&str>,
486 query_vec: &[f32],
487 ) -> Result<(i64, ResolutionOutcome), MemoryError> {
488 let entity_id = self
489 .store
490 .upsert_entity(surface_name, normalized, et, summary)
491 .await?;
492 self.register_aliases(entity_id.0, normalized, original_name)
493 .await?;
494 self.store_entity_embedding(
495 emb_store,
496 entity_id.0,
497 None,
498 normalized,
499 et,
500 summary.unwrap_or(""),
501 query_vec,
502 )
503 .await;
504 Ok((entity_id.0, ResolutionOutcome::Created))
505 }
506
507 async fn register_aliases(
509 &self,
510 entity_id: i64,
511 normalized: &str,
512 original_name: &str,
513 ) -> Result<(), MemoryError> {
514 self.store.add_alias(entity_id, normalized).await?;
515
516 let original_trimmed = original_name.trim().to_lowercase();
519 let original_clean_str = strip_control_chars(&original_trimmed);
520 let original_clean = truncate_to_bytes_ref(&original_clean_str, MAX_ENTITY_NAME_BYTES);
521 if original_clean != normalized {
522 self.store.add_alias(entity_id, original_clean).await?;
523 }
524
525 Ok(())
526 }
527
528 #[allow(clippy::too_many_arguments)] async fn merge_entity(
536 &self,
537 emb_store: &EmbeddingStore,
538 provider: &AnyProvider,
539 entity_id: i64,
540 new_surface_name: &str,
541 new_canonical_name: &str,
542 entity_type: EntityType,
543 new_summary: Option<&str>,
544 existing_canonical_name: Option<&str>,
545 existing_summary_payload: Option<&str>,
546 existing_point_id: Option<&str>,
547 ) -> Result<(), MemoryError> {
548 let (existing_canonical, existing_summary, existing_point_id_owned) =
553 if existing_canonical_name.is_some() && existing_summary_payload.is_some() {
554 (
555 existing_canonical_name
556 .unwrap_or(new_canonical_name)
557 .to_owned(),
558 existing_summary_payload.unwrap_or("").to_owned(),
559 existing_point_id.map(ToOwned::to_owned),
560 )
561 } else {
562 let existing = self.store.find_entity_by_id(entity_id).await?;
567 let canonical = existing_canonical_name.map_or_else(
568 || {
569 existing.as_ref().map_or_else(
570 || new_canonical_name.to_owned(),
571 |e| e.canonical_name.clone(),
572 )
573 },
574 ToOwned::to_owned,
575 );
576 let summary = existing
577 .as_ref()
578 .and_then(|e| e.summary.as_deref())
579 .unwrap_or("")
580 .to_owned();
581 let pid = existing_point_id.map(ToOwned::to_owned).or_else(|| {
582 existing
583 .as_ref()
584 .and_then(|e| e.qdrant_point_id.as_deref())
585 .map(ToOwned::to_owned)
586 });
587 (canonical, summary, pid)
588 };
589
590 let merged_summary = if let Some(new) = new_summary {
591 if !new.is_empty() && !existing_summary.is_empty() {
592 let combined = format!("{existing_summary}; {new}");
593 truncate_to_bytes_ref(&combined, MAX_FACT_BYTES).to_owned()
595 } else if !new.is_empty() {
596 new.to_owned()
597 } else {
598 existing_summary.clone()
599 }
600 } else {
601 existing_summary.clone()
602 };
603
604 let summary_opt = if merged_summary.is_empty() {
605 None
606 } else {
607 Some(merged_summary.as_str())
608 };
609
610 self.store
613 .upsert_entity(
614 new_surface_name,
615 &existing_canonical,
616 entity_type,
617 summary_opt,
618 )
619 .await?;
620
621 let embed_text = format!("{new_surface_name}: {merged_summary}");
623 let embed_result = tokio::time::timeout(
624 std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
625 provider.embed(&embed_text),
626 )
627 .await;
628
629 match embed_result {
630 Ok(Ok(vec)) => {
631 self.store_entity_embedding(
632 emb_store,
633 entity_id,
634 existing_point_id_owned.as_deref(),
635 new_surface_name,
636 entity_type,
637 &merged_summary,
638 &vec,
639 )
640 .await;
641 }
642 Ok(Err(err)) => {
643 tracing::warn!(
644 entity_id,
645 error = %err,
646 "merge re-embed failed; Qdrant entry may be stale"
647 );
648 }
649 Err(_) => {
650 tracing::warn!(
651 entity_id,
652 "merge re-embed timed out; Qdrant entry may be stale"
653 );
654 }
655 }
656
657 Ok(())
658 }
659
660 #[allow(clippy::too_many_arguments)] async fn store_entity_embedding(
669 &self,
670 emb_store: &EmbeddingStore,
671 entity_id: i64,
672 existing_point_id: Option<&str>,
673 name: &str,
674 entity_type: EntityType,
675 summary: &str,
676 vector: &[f32],
677 ) {
678 let vector_size = u64::try_from(vector.len()).unwrap_or(384);
684 let collection_ensured = Arc::clone(&self.collection_ensured);
685 if let Err(err) = collection_ensured
686 .get_or_try_init(|| async {
687 emb_store
688 .ensure_named_collection(ENTITY_COLLECTION, vector_size)
689 .await
690 })
691 .await
692 {
693 tracing::error!(
694 error = %err,
695 "failed to ensure entity embedding collection; skipping Qdrant upsert"
696 );
697 return;
698 }
699
700 let payload = serde_json::json!({
701 "entity_id": entity_id,
702 "entity_id_str": entity_id.to_string(),
706 "canonical_name": name,
707 "name": name,
708 "entity_type": entity_type.as_str(),
709 "summary": summary,
710 });
711
712 if let Some(point_id) = existing_point_id {
713 if let Err(err) = emb_store
715 .upsert_to_collection(ENTITY_COLLECTION, point_id, payload, vector.to_vec())
716 .await
717 {
718 tracing::warn!(
719 entity_id,
720 error = %err,
721 "Qdrant upsert (existing point) failed; Qdrant entry may be stale"
722 );
723 }
724 } else {
725 match emb_store
726 .store_to_collection(ENTITY_COLLECTION, payload, vector.to_vec())
727 .await
728 {
729 Ok(point_id) => {
730 if let Err(err) = self
731 .store
732 .set_entity_qdrant_point_id(entity_id, &point_id)
733 .await
734 {
735 tracing::warn!(
736 entity_id,
737 error = %err,
738 "failed to store qdrant_point_id in SQLite"
739 );
740 }
741 }
742 Err(err) => {
743 tracing::warn!(
744 entity_id,
745 error = %err,
746 "Qdrant upsert failed; entity created in SQLite, qdrant_point_id remains NULL"
747 );
748 }
749 }
750 }
751 }
752
753 #[allow(clippy::too_many_arguments)] async fn llm_disambiguate(
758 &self,
759 provider: &AnyProvider,
760 new_name: &str,
761 new_type: &str,
762 new_summary: &str,
763 existing_name: &str,
764 existing_type: &str,
765 existing_summary: &str,
766 score: f32,
767 ) -> Option<bool> {
768 let prompt = format!(
769 "New entity:\n\
770 - Name: <external-data>{new_name}</external-data>\n\
771 - Type: <external-data>{new_type}</external-data>\n\
772 - Summary: <external-data>{new_summary}</external-data>\n\
773 \n\
774 Existing entity:\n\
775 - Name: <external-data>{existing_name}</external-data>\n\
776 - Type: <external-data>{existing_type}</external-data>\n\
777 - Summary: <external-data>{existing_summary}</external-data>\n\
778 \n\
779 Cosine similarity: {score:.2}\n\
780 \n\
781 Are these the same entity? Respond with JSON: {{\"same_entity\": true}} or {{\"same_entity\": false}}"
782 );
783
784 let messages = [
785 Message::from_legacy(
786 Role::System,
787 "You are an entity disambiguation assistant. Given a new entity mention and \
788 an existing entity from the knowledge graph, determine if they refer to the same \
789 real-world entity. Respond only with JSON.",
790 ),
791 Message::from_legacy(Role::User, prompt),
792 ];
793
794 let response = match provider.chat(&messages).await {
795 Ok(r) => r,
796 Err(err) => {
797 tracing::warn!(error = %err, "LLM disambiguation chat failed");
798 return None;
799 }
800 };
801
802 let json_str = extract_json(&response);
804 match serde_json::from_str::<DisambiguationResponse>(json_str) {
805 Ok(parsed) => Some(parsed.same_entity),
806 Err(err) => {
807 tracing::warn!(error = %err, response = %response, "failed to parse LLM disambiguation response");
808 None
809 }
810 }
811 }
812
813 pub async fn resolve_batch(
826 &self,
827 entities: &[ExtractedEntity],
828 ) -> Result<Vec<(i64, ResolutionOutcome)>, MemoryError> {
829 if entities.is_empty() {
830 return Ok(Vec::new());
831 }
832
833 let mut results: Vec<Option<(i64, ResolutionOutcome)>> = vec![None; entities.len()];
835
836 let mut stream = stream::iter(entities.iter().enumerate().map(|(i, e)| {
837 let name = e.name.clone();
838 let entity_type = e.entity_type.clone();
839 let summary = e.summary.clone();
840 async move {
841 let result = self.resolve(&name, &entity_type, summary.as_deref()).await;
842 (i, result)
843 }
844 }))
845 .buffer_unordered(4);
846
847 while let Some((i, result)) = stream.next().await {
848 match result {
849 Ok(outcome) => results[i] = Some(outcome),
850 Err(err) => return Err(err),
851 }
852 }
853
854 Ok(results
855 .into_iter()
856 .enumerate()
857 .map(|(i, r)| {
858 r.unwrap_or_else(|| {
859 tracing::warn!(
860 index = i,
861 "resolve_batch: missing result at index — bug in stream collection"
862 );
863 panic!("resolve_batch: missing result at index {i}")
864 })
865 })
866 .collect())
867 }
868
869 pub async fn resolve_edge(
883 &self,
884 source_id: i64,
885 target_id: i64,
886 relation: &str,
887 fact: &str,
888 confidence: f32,
889 episode_id: Option<MessageId>,
890 ) -> Result<Option<i64>, MemoryError> {
891 let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
892 let normalized_relation =
893 truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
894
895 let fact_clean = strip_control_chars(fact.trim());
896 let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
897
898 let existing_edges = self.store.edges_exact(source_id, target_id).await?;
900
901 let matching = existing_edges
902 .iter()
903 .find(|e| e.relation == normalized_relation);
904
905 if let Some(old) = matching {
906 if old.fact == normalized_fact {
907 return Ok(None);
909 }
910 self.store.invalidate_edge(old.id).await?;
912 }
913
914 let new_id = self
915 .store
916 .insert_edge(
917 source_id,
918 target_id,
919 &normalized_relation,
920 &normalized_fact,
921 confidence,
922 episode_id,
923 )
924 .await?;
925 Ok(Some(new_id))
926 }
927
928 #[allow(clippy::too_many_arguments)] pub async fn resolve_edge_typed(
946 &self,
947 source_id: i64,
948 target_id: i64,
949 relation: &str,
950 fact: &str,
951 confidence: f32,
952 episode_id: Option<crate::types::MessageId>,
953 edge_type: crate::graph::EdgeType,
954 belief_revision: Option<&crate::graph::BeliefRevisionConfig>,
955 ) -> Result<Option<i64>, MemoryError> {
956 let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
957 let normalized_relation =
958 truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
959
960 let fact_clean = strip_control_chars(fact.trim());
961 let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
962
963 let existing_edges = self.store.edges_exact(source_id, target_id).await?;
964
965 let matching = existing_edges
967 .iter()
968 .find(|e| e.relation == normalized_relation && e.edge_type == edge_type);
969
970 if matching.is_some_and(|old| old.fact == normalized_fact) {
971 return Ok(None);
972 }
973
974 let superseded_ids: Vec<i64> = if let (Some(cfg), Some(provider)) =
976 (belief_revision, self.provider)
977 {
978 match tokio::time::timeout(
980 std::time::Duration::from_secs(5),
981 provider.embed(&normalized_fact),
982 )
983 .await
984 {
985 Ok(Ok(new_emb)) => {
986 match crate::graph::belief_revision::find_superseded_edges(
987 &existing_edges,
988 &new_emb,
989 &normalized_relation,
990 edge_type,
991 provider,
992 cfg,
993 )
994 .await
995 {
996 Ok(ids) => ids,
997 Err(err) => {
998 tracing::warn!(error = %err,
999 "belief_revision: find_superseded_edges failed, falling back to exact match");
1000 matching.map(|e| vec![e.id]).unwrap_or_default()
1001 }
1002 }
1003 }
1004 Ok(Err(err)) => {
1005 tracing::warn!(error = %err,
1006 "belief_revision: embed new fact failed, falling back to exact match");
1007 matching.map(|e| vec![e.id]).unwrap_or_default()
1008 }
1009 Err(_) => {
1010 tracing::warn!(
1011 "belief_revision: embed new fact timed out, falling back to exact match"
1012 );
1013 matching.map(|e| vec![e.id]).unwrap_or_default()
1014 }
1015 }
1016 } else {
1017 matching.map(|e| vec![e.id]).unwrap_or_default()
1019 };
1020
1021 let new_id = self
1022 .store
1023 .insert_edge_typed(
1024 source_id,
1025 target_id,
1026 &normalized_relation,
1027 &normalized_fact,
1028 confidence,
1029 episode_id,
1030 edge_type,
1031 )
1032 .await?;
1033
1034 for old_id in superseded_ids {
1036 if belief_revision.is_some() {
1037 self.store
1038 .invalidate_edge_with_supersession(old_id, new_id)
1039 .await?;
1040 } else {
1041 self.store.invalidate_edge(old_id).await?;
1042 }
1043 }
1044
1045 Ok(Some(new_id))
1046 }
1047}
1048
1049fn extract_json(s: &str) -> &str {
1051 let trimmed = s.trim();
1052 if let Some(inner) = trimmed.strip_prefix("```json")
1054 && let Some(end) = inner.rfind("```")
1055 {
1056 return inner[..end].trim();
1057 }
1058 if let Some(inner) = trimmed.strip_prefix("```")
1059 && let Some(end) = inner.rfind("```")
1060 {
1061 return inner[..end].trim();
1062 }
1063 if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
1065 && start <= end
1066 {
1067 return &trimmed[start..=end];
1068 }
1069 trimmed
1070}
1071
1072#[cfg(test)]
1073mod tests;