1use std::sync::Arc;
5
6use dashmap::DashMap;
7use futures::stream::{self, StreamExt as _};
8use schemars::JsonSchema;
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use zeph_common::sanitize::strip_control_chars;
12use zeph_common::text::truncate_to_bytes_ref;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{LlmProvider as _, Message, Role};
15
16use super::store::GraphStore;
17use super::types::EntityType;
18use crate::embedding_store::EmbeddingStore;
19use crate::error::MemoryError;
20use crate::graph::extractor::ExtractedEntity;
21use crate::types::MessageId;
22use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
23
24const MIN_ENTITY_NAME_BYTES: usize = 3;
26const MAX_ENTITY_NAME_BYTES: usize = 512;
28const MAX_RELATION_BYTES: usize = 256;
30const MAX_FACT_BYTES: usize = 2048;
32
33const ENTITY_COLLECTION: &str = "zeph_graph_entities";
35
36const EMBED_TIMEOUT_SECS: u64 = 30;
38
39#[derive(Debug, Clone, PartialEq)]
41pub enum ResolutionOutcome {
42 ExactMatch,
44 EmbeddingMatch { score: f32 },
46 LlmDisambiguated,
48 Created,
50}
51
52#[derive(Debug, Deserialize, JsonSchema)]
54struct DisambiguationResponse {
55 same_entity: bool,
56}
57
58type NameLockMap = Arc<DashMap<String, Arc<Mutex<()>>>>;
67
68pub struct EntityResolver<'a> {
69 store: &'a GraphStore,
70 embedding_store: Option<&'a Arc<EmbeddingStore>>,
71 provider: Option<&'a AnyProvider>,
72 similarity_threshold: f32,
73 ambiguous_threshold: f32,
74 name_locks: NameLockMap,
75 fallback_count: Arc<std::sync::atomic::AtomicU64>,
77 collection_ensured: Arc<tokio::sync::OnceCell<()>>,
81}
82
83impl<'a> EntityResolver<'a> {
84 #[must_use]
86 pub fn graph_store(&self) -> &GraphStore {
87 self.store
88 }
89
90 #[must_use]
91 pub fn new(store: &'a GraphStore) -> Self {
92 Self {
93 store,
94 embedding_store: None,
95 provider: None,
96 similarity_threshold: 0.85,
97 ambiguous_threshold: 0.70,
98 name_locks: Arc::new(DashMap::new()),
99 fallback_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
100 collection_ensured: Arc::new(tokio::sync::OnceCell::new()),
101 }
102 }
103
104 #[must_use]
105 pub fn with_embedding_store(mut self, store: &'a Arc<EmbeddingStore>) -> Self {
106 self.embedding_store = Some(store);
107 self
108 }
109
110 #[must_use]
111 pub fn with_provider(mut self, provider: &'a AnyProvider) -> Self {
112 self.provider = Some(provider);
113 self
114 }
115
116 #[must_use]
117 pub fn with_thresholds(mut self, similarity: f32, ambiguous: f32) -> Self {
118 self.similarity_threshold = similarity;
119 self.ambiguous_threshold = ambiguous;
120 self
121 }
122
123 #[must_use]
125 pub fn fallback_count(&self) -> Arc<std::sync::atomic::AtomicU64> {
126 Arc::clone(&self.fallback_count)
127 }
128
129 fn normalize_name(name: &str) -> String {
131 let lowered = name.trim().to_lowercase();
132 let cleaned = strip_control_chars(&lowered);
133 let normalized = truncate_to_bytes_ref(&cleaned, MAX_ENTITY_NAME_BYTES).to_owned();
134 if normalized.len() < cleaned.len() {
135 tracing::debug!(
136 "graph resolver: entity name truncated to {} bytes",
137 MAX_ENTITY_NAME_BYTES
138 );
139 }
140 normalized
141 }
142
143 fn parse_entity_type(entity_type: &str) -> EntityType {
145 entity_type
146 .trim()
147 .to_lowercase()
148 .parse::<EntityType>()
149 .unwrap_or_else(|_| {
150 tracing::debug!(
151 "graph resolver: unknown entity type {:?}, falling back to Concept",
152 entity_type
153 );
154 EntityType::Concept
155 })
156 }
157
158 async fn lock_name(&self, normalized: &str) -> tokio::sync::OwnedMutexGuard<()> {
160 let lock = self
161 .name_locks
162 .entry(normalized.to_owned())
163 .or_insert_with(|| Arc::new(Mutex::new(())))
164 .clone();
165 lock.lock_owned().await
166 }
167
168 pub async fn resolve(
187 &self,
188 name: &str,
189 entity_type: &str,
190 summary: Option<&str>,
191 ) -> Result<(i64, ResolutionOutcome), MemoryError> {
192 let normalized = Self::normalize_name(name);
193
194 if normalized.is_empty() {
195 return Err(MemoryError::GraphStore("empty entity name".into()));
196 }
197
198 if normalized.len() < MIN_ENTITY_NAME_BYTES {
199 return Err(MemoryError::GraphStore(format!(
200 "entity name too short: {normalized:?} ({} bytes, min {MIN_ENTITY_NAME_BYTES})",
201 normalized.len()
202 )));
203 }
204
205 let et = Self::parse_entity_type(entity_type);
206
207 let surface_name = name.trim().to_owned();
209
210 let _guard = self.lock_name(&normalized).await;
212
213 if let Some(entity) = self.store.find_entity_by_alias(&normalized, et).await? {
215 self.store
216 .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
217 .await?;
218 return Ok((entity.id, ResolutionOutcome::ExactMatch));
219 }
220
221 if let Some(entity) = self.store.find_entity(&normalized, et).await? {
223 self.store
224 .upsert_entity(&surface_name, &entity.canonical_name, et, summary)
225 .await?;
226 return Ok((entity.id, ResolutionOutcome::ExactMatch));
227 }
228
229 if let Some(outcome) = self
231 .resolve_via_embedding(&normalized, name, &surface_name, et, summary)
232 .await?
233 {
234 return Ok(outcome);
235 }
236
237 let entity_id = self
239 .store
240 .upsert_entity(&surface_name, &normalized, et, summary)
241 .await?;
242
243 self.register_aliases(entity_id, &normalized, name).await?;
244
245 Ok((entity_id, ResolutionOutcome::Created))
246 }
247
248 async fn embed_entity_text(
251 &self,
252 provider: &AnyProvider,
253 normalized: &str,
254 summary: Option<&str>,
255 ) -> Option<Vec<f32>> {
256 let safe_summary = truncate_to_bytes_ref(summary.unwrap_or(""), MAX_FACT_BYTES);
257 let embed_text = format!("{normalized}: {safe_summary}");
258 let embed_result = tokio::time::timeout(
259 std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
260 provider.embed(&embed_text),
261 )
262 .await;
263 match embed_result {
264 Ok(Ok(v)) => Some(v),
265 Ok(Err(err)) => {
266 self.fallback_count
267 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
268 tracing::warn!(entity_name = %normalized, error = %err,
269 "embed() failed; falling back to exact-match-only entity creation");
270 None
271 }
272 Err(_timeout) => {
273 self.fallback_count
274 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
275 tracing::warn!(entity_name = %normalized,
276 "embed() timed out after {}s; falling back to create new entity",
277 EMBED_TIMEOUT_SECS);
278 None
279 }
280 }
281 }
282
283 #[allow(clippy::too_many_arguments)] async fn handle_ambiguous_candidate(
287 &self,
288 emb_store: &EmbeddingStore,
289 provider: &AnyProvider,
290 payload: &std::collections::HashMap<String, serde_json::Value>,
291 point_id: &str,
292 score: f32,
293 surface_name: &str,
294 normalized: &str,
295 et: EntityType,
296 summary: Option<&str>,
297 ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
298 let entity_id = payload
299 .get("entity_id")
300 .and_then(serde_json::Value::as_i64)
301 .ok_or_else(|| MemoryError::GraphStore("missing entity_id in payload".into()))?;
302 let existing_name = payload
303 .get("name")
304 .and_then(|v| v.as_str())
305 .unwrap_or("")
306 .to_owned();
307 let existing_summary = payload
308 .get("summary")
309 .and_then(|v| v.as_str())
310 .unwrap_or("")
311 .to_owned();
312 let existing_type = payload
314 .get("entity_type")
315 .and_then(|v| v.as_str())
316 .unwrap_or(et.as_str())
317 .to_owned();
318 let existing_canonical = payload.get("canonical_name").and_then(|v| v.as_str());
319 let existing_summary_str = payload.get("summary").and_then(|v| v.as_str());
320 match self
321 .llm_disambiguate(
322 provider,
323 normalized,
324 et.as_str(),
325 summary.unwrap_or(""),
326 &existing_name,
327 &existing_type,
328 &existing_summary,
329 score,
330 )
331 .await
332 {
333 Some(true) => {
334 self.merge_entity(
335 emb_store,
336 provider,
337 entity_id,
338 surface_name,
339 normalized,
340 et,
341 summary,
342 existing_canonical,
343 existing_summary_str,
344 Some(point_id),
345 )
346 .await?;
347 Ok(Some((entity_id, ResolutionOutcome::LlmDisambiguated)))
348 }
349 Some(false) => Ok(None),
350 None => {
351 self.fallback_count
352 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
353 tracing::warn!(entity_name = %normalized,
354 "LLM disambiguation failed; falling back to create new entity");
355 Ok(None)
356 }
357 }
358 }
359
360 async fn resolve_via_embedding(
363 &self,
364 normalized: &str,
365 original_name: &str,
366 surface_name: &str,
367 et: EntityType,
368 summary: Option<&str>,
369 ) -> Result<Option<(i64, ResolutionOutcome)>, MemoryError> {
370 let (Some(emb_store), Some(provider)) = (self.embedding_store, self.provider) else {
371 return Ok(None);
372 };
373
374 let Some(query_vec) = self.embed_entity_text(provider, normalized, summary).await else {
375 return Ok(None);
376 };
377
378 let type_filter = VectorFilter {
379 must: vec![FieldCondition {
380 field: "entity_type".into(),
381 value: FieldValue::Text(et.as_str().to_owned()),
382 }],
383 must_not: vec![],
384 };
385 let candidates = match emb_store
386 .search_collection(ENTITY_COLLECTION, &query_vec, 5, Some(type_filter))
387 .await
388 {
389 Ok(c) => c,
390 Err(err) => {
391 self.fallback_count
392 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
393 tracing::warn!(entity_name = %normalized, error = %err,
394 "Qdrant search failed; falling back to create new entity");
395 return self
396 .create_with_embedding(
397 emb_store,
398 surface_name,
399 normalized,
400 original_name,
401 et,
402 summary,
403 &query_vec,
404 )
405 .await
406 .map(Some);
407 }
408 };
409
410 if let Some(best) = candidates.first() {
411 let score = best.score;
412 if score >= self.similarity_threshold {
413 let entity_id = best
414 .payload
415 .get("entity_id")
416 .and_then(serde_json::Value::as_i64)
417 .ok_or_else(|| {
418 MemoryError::GraphStore("missing entity_id in payload".into())
419 })?;
420 let existing_canonical =
421 best.payload.get("canonical_name").and_then(|v| v.as_str());
422 let existing_summary = best.payload.get("summary").and_then(|v| v.as_str());
423 let existing_pid = Some(best.id.as_str());
424 self.merge_entity(
425 emb_store,
426 provider,
427 entity_id,
428 surface_name,
429 normalized,
430 et,
431 summary,
432 existing_canonical,
433 existing_summary,
434 existing_pid,
435 )
436 .await?;
437 return Ok(Some((
438 entity_id,
439 ResolutionOutcome::EmbeddingMatch { score },
440 )));
441 } else if score >= self.ambiguous_threshold
442 && let Some(result) = self
443 .handle_ambiguous_candidate(
444 emb_store,
445 provider,
446 &best.payload,
447 &best.id,
448 score,
449 surface_name,
450 normalized,
451 et,
452 summary,
453 )
454 .await?
455 {
456 return Ok(Some(result));
457 }
458 }
460
461 self.create_with_embedding(
463 emb_store,
464 surface_name,
465 normalized,
466 original_name,
467 et,
468 summary,
469 &query_vec,
470 )
471 .await
472 .map(Some)
473 }
474
475 #[allow(clippy::too_many_arguments)] async fn create_with_embedding(
478 &self,
479 emb_store: &EmbeddingStore,
480 surface_name: &str,
481 normalized: &str,
482 original_name: &str,
483 et: EntityType,
484 summary: Option<&str>,
485 query_vec: &[f32],
486 ) -> Result<(i64, ResolutionOutcome), MemoryError> {
487 let entity_id = self
488 .store
489 .upsert_entity(surface_name, normalized, et, summary)
490 .await?;
491 self.register_aliases(entity_id, normalized, original_name)
492 .await?;
493 self.store_entity_embedding(
494 emb_store,
495 entity_id,
496 None,
497 normalized,
498 et,
499 summary.unwrap_or(""),
500 query_vec,
501 )
502 .await;
503 Ok((entity_id, ResolutionOutcome::Created))
504 }
505
506 async fn register_aliases(
508 &self,
509 entity_id: i64,
510 normalized: &str,
511 original_name: &str,
512 ) -> Result<(), MemoryError> {
513 self.store.add_alias(entity_id, normalized).await?;
514
515 let original_trimmed = original_name.trim().to_lowercase();
518 let original_clean_str = strip_control_chars(&original_trimmed);
519 let original_clean = truncate_to_bytes_ref(&original_clean_str, MAX_ENTITY_NAME_BYTES);
520 if original_clean != normalized {
521 self.store.add_alias(entity_id, original_clean).await?;
522 }
523
524 Ok(())
525 }
526
527 #[allow(clippy::too_many_arguments)] async fn merge_entity(
535 &self,
536 emb_store: &EmbeddingStore,
537 provider: &AnyProvider,
538 entity_id: i64,
539 new_surface_name: &str,
540 new_canonical_name: &str,
541 entity_type: EntityType,
542 new_summary: Option<&str>,
543 existing_canonical_name: Option<&str>,
544 existing_summary_payload: Option<&str>,
545 existing_point_id: Option<&str>,
546 ) -> Result<(), MemoryError> {
547 let (existing_canonical, existing_summary, existing_point_id_owned) =
552 if existing_canonical_name.is_some() && existing_summary_payload.is_some() {
553 (
554 existing_canonical_name
555 .unwrap_or(new_canonical_name)
556 .to_owned(),
557 existing_summary_payload.unwrap_or("").to_owned(),
558 existing_point_id.map(ToOwned::to_owned),
559 )
560 } else {
561 let existing = self.store.find_entity_by_id(entity_id).await?;
566 let canonical = existing_canonical_name.map_or_else(
567 || {
568 existing.as_ref().map_or_else(
569 || new_canonical_name.to_owned(),
570 |e| e.canonical_name.clone(),
571 )
572 },
573 ToOwned::to_owned,
574 );
575 let summary = existing
576 .as_ref()
577 .and_then(|e| e.summary.as_deref())
578 .unwrap_or("")
579 .to_owned();
580 let pid = existing_point_id.map(ToOwned::to_owned).or_else(|| {
581 existing
582 .as_ref()
583 .and_then(|e| e.qdrant_point_id.as_deref())
584 .map(ToOwned::to_owned)
585 });
586 (canonical, summary, pid)
587 };
588
589 let merged_summary = if let Some(new) = new_summary {
590 if !new.is_empty() && !existing_summary.is_empty() {
591 let combined = format!("{existing_summary}; {new}");
592 truncate_to_bytes_ref(&combined, MAX_FACT_BYTES).to_owned()
594 } else if !new.is_empty() {
595 new.to_owned()
596 } else {
597 existing_summary.clone()
598 }
599 } else {
600 existing_summary.clone()
601 };
602
603 let summary_opt = if merged_summary.is_empty() {
604 None
605 } else {
606 Some(merged_summary.as_str())
607 };
608
609 self.store
612 .upsert_entity(
613 new_surface_name,
614 &existing_canonical,
615 entity_type,
616 summary_opt,
617 )
618 .await?;
619
620 let embed_text = format!("{new_surface_name}: {merged_summary}");
622 let embed_result = tokio::time::timeout(
623 std::time::Duration::from_secs(EMBED_TIMEOUT_SECS),
624 provider.embed(&embed_text),
625 )
626 .await;
627
628 match embed_result {
629 Ok(Ok(vec)) => {
630 self.store_entity_embedding(
631 emb_store,
632 entity_id,
633 existing_point_id_owned.as_deref(),
634 new_surface_name,
635 entity_type,
636 &merged_summary,
637 &vec,
638 )
639 .await;
640 }
641 Ok(Err(err)) => {
642 tracing::warn!(
643 entity_id,
644 error = %err,
645 "merge re-embed failed; Qdrant entry may be stale"
646 );
647 }
648 Err(_) => {
649 tracing::warn!(
650 entity_id,
651 "merge re-embed timed out; Qdrant entry may be stale"
652 );
653 }
654 }
655
656 Ok(())
657 }
658
659 #[allow(clippy::too_many_arguments)] async fn store_entity_embedding(
668 &self,
669 emb_store: &EmbeddingStore,
670 entity_id: i64,
671 existing_point_id: Option<&str>,
672 name: &str,
673 entity_type: EntityType,
674 summary: &str,
675 vector: &[f32],
676 ) {
677 let vector_size = u64::try_from(vector.len()).unwrap_or(384);
683 let collection_ensured = Arc::clone(&self.collection_ensured);
684 if let Err(err) = collection_ensured
685 .get_or_try_init(|| async {
686 emb_store
687 .ensure_named_collection(ENTITY_COLLECTION, vector_size)
688 .await
689 })
690 .await
691 {
692 tracing::error!(
693 error = %err,
694 "failed to ensure entity embedding collection; skipping Qdrant upsert"
695 );
696 return;
697 }
698
699 let payload = serde_json::json!({
700 "entity_id": entity_id,
701 "entity_id_str": entity_id.to_string(),
705 "canonical_name": name,
706 "name": name,
707 "entity_type": entity_type.as_str(),
708 "summary": summary,
709 });
710
711 if let Some(point_id) = existing_point_id {
712 if let Err(err) = emb_store
714 .upsert_to_collection(ENTITY_COLLECTION, point_id, payload, vector.to_vec())
715 .await
716 {
717 tracing::warn!(
718 entity_id,
719 error = %err,
720 "Qdrant upsert (existing point) failed; Qdrant entry may be stale"
721 );
722 }
723 } else {
724 match emb_store
725 .store_to_collection(ENTITY_COLLECTION, payload, vector.to_vec())
726 .await
727 {
728 Ok(point_id) => {
729 if let Err(err) = self
730 .store
731 .set_entity_qdrant_point_id(entity_id, &point_id)
732 .await
733 {
734 tracing::warn!(
735 entity_id,
736 error = %err,
737 "failed to store qdrant_point_id in SQLite"
738 );
739 }
740 }
741 Err(err) => {
742 tracing::warn!(
743 entity_id,
744 error = %err,
745 "Qdrant upsert failed; entity created in SQLite, qdrant_point_id remains NULL"
746 );
747 }
748 }
749 }
750 }
751
752 #[allow(clippy::too_many_arguments)] async fn llm_disambiguate(
757 &self,
758 provider: &AnyProvider,
759 new_name: &str,
760 new_type: &str,
761 new_summary: &str,
762 existing_name: &str,
763 existing_type: &str,
764 existing_summary: &str,
765 score: f32,
766 ) -> Option<bool> {
767 let prompt = format!(
768 "New entity:\n\
769 - Name: <external-data>{new_name}</external-data>\n\
770 - Type: <external-data>{new_type}</external-data>\n\
771 - Summary: <external-data>{new_summary}</external-data>\n\
772 \n\
773 Existing entity:\n\
774 - Name: <external-data>{existing_name}</external-data>\n\
775 - Type: <external-data>{existing_type}</external-data>\n\
776 - Summary: <external-data>{existing_summary}</external-data>\n\
777 \n\
778 Cosine similarity: {score:.2}\n\
779 \n\
780 Are these the same entity? Respond with JSON: {{\"same_entity\": true}} or {{\"same_entity\": false}}"
781 );
782
783 let messages = [
784 Message::from_legacy(
785 Role::System,
786 "You are an entity disambiguation assistant. Given a new entity mention and \
787 an existing entity from the knowledge graph, determine if they refer to the same \
788 real-world entity. Respond only with JSON.",
789 ),
790 Message::from_legacy(Role::User, prompt),
791 ];
792
793 let response = match provider.chat(&messages).await {
794 Ok(r) => r,
795 Err(err) => {
796 tracing::warn!(error = %err, "LLM disambiguation chat failed");
797 return None;
798 }
799 };
800
801 let json_str = extract_json(&response);
803 match serde_json::from_str::<DisambiguationResponse>(json_str) {
804 Ok(parsed) => Some(parsed.same_entity),
805 Err(err) => {
806 tracing::warn!(error = %err, response = %response, "failed to parse LLM disambiguation response");
807 None
808 }
809 }
810 }
811
812 pub async fn resolve_batch(
825 &self,
826 entities: &[ExtractedEntity],
827 ) -> Result<Vec<(i64, ResolutionOutcome)>, MemoryError> {
828 if entities.is_empty() {
829 return Ok(Vec::new());
830 }
831
832 let mut results: Vec<Option<(i64, ResolutionOutcome)>> = vec![None; entities.len()];
834
835 let mut stream = stream::iter(entities.iter().enumerate().map(|(i, e)| {
836 let name = e.name.clone();
837 let entity_type = e.entity_type.clone();
838 let summary = e.summary.clone();
839 async move {
840 let result = self.resolve(&name, &entity_type, summary.as_deref()).await;
841 (i, result)
842 }
843 }))
844 .buffer_unordered(4);
845
846 while let Some((i, result)) = stream.next().await {
847 match result {
848 Ok(outcome) => results[i] = Some(outcome),
849 Err(err) => return Err(err),
850 }
851 }
852
853 Ok(results
854 .into_iter()
855 .enumerate()
856 .map(|(i, r)| {
857 r.unwrap_or_else(|| {
858 tracing::warn!(
859 index = i,
860 "resolve_batch: missing result at index — bug in stream collection"
861 );
862 panic!("resolve_batch: missing result at index {i}")
863 })
864 })
865 .collect())
866 }
867
868 pub async fn resolve_edge(
882 &self,
883 source_id: i64,
884 target_id: i64,
885 relation: &str,
886 fact: &str,
887 confidence: f32,
888 episode_id: Option<MessageId>,
889 ) -> Result<Option<i64>, MemoryError> {
890 let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
891 let normalized_relation =
892 truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
893
894 let fact_clean = strip_control_chars(fact.trim());
895 let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
896
897 let existing_edges = self.store.edges_exact(source_id, target_id).await?;
899
900 let matching = existing_edges
901 .iter()
902 .find(|e| e.relation == normalized_relation);
903
904 if let Some(old) = matching {
905 if old.fact == normalized_fact {
906 return Ok(None);
908 }
909 self.store.invalidate_edge(old.id).await?;
911 }
912
913 let new_id = self
914 .store
915 .insert_edge(
916 source_id,
917 target_id,
918 &normalized_relation,
919 &normalized_fact,
920 confidence,
921 episode_id,
922 )
923 .await?;
924 Ok(Some(new_id))
925 }
926
927 #[allow(clippy::too_many_arguments)] pub async fn resolve_edge_typed(
945 &self,
946 source_id: i64,
947 target_id: i64,
948 relation: &str,
949 fact: &str,
950 confidence: f32,
951 episode_id: Option<crate::types::MessageId>,
952 edge_type: crate::graph::EdgeType,
953 belief_revision: Option<&crate::graph::BeliefRevisionConfig>,
954 ) -> Result<Option<i64>, MemoryError> {
955 let relation_clean = strip_control_chars(&relation.trim().to_lowercase());
956 let normalized_relation =
957 truncate_to_bytes_ref(&relation_clean, MAX_RELATION_BYTES).to_owned();
958
959 let fact_clean = strip_control_chars(fact.trim());
960 let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
961
962 let existing_edges = self.store.edges_exact(source_id, target_id).await?;
963
964 let matching = existing_edges
966 .iter()
967 .find(|e| e.relation == normalized_relation && e.edge_type == edge_type);
968
969 if matching.is_some_and(|old| old.fact == normalized_fact) {
970 return Ok(None);
971 }
972
973 let superseded_ids: Vec<i64> = if let (Some(cfg), Some(provider)) =
975 (belief_revision, self.provider)
976 {
977 match tokio::time::timeout(
979 std::time::Duration::from_secs(5),
980 provider.embed(&normalized_fact),
981 )
982 .await
983 {
984 Ok(Ok(new_emb)) => {
985 match crate::graph::belief_revision::find_superseded_edges(
986 &existing_edges,
987 &new_emb,
988 &normalized_relation,
989 edge_type,
990 provider,
991 cfg,
992 )
993 .await
994 {
995 Ok(ids) => ids,
996 Err(err) => {
997 tracing::warn!(error = %err,
998 "belief_revision: find_superseded_edges failed, falling back to exact match");
999 matching.map(|e| vec![e.id]).unwrap_or_default()
1000 }
1001 }
1002 }
1003 Ok(Err(err)) => {
1004 tracing::warn!(error = %err,
1005 "belief_revision: embed new fact failed, falling back to exact match");
1006 matching.map(|e| vec![e.id]).unwrap_or_default()
1007 }
1008 Err(_) => {
1009 tracing::warn!(
1010 "belief_revision: embed new fact timed out, falling back to exact match"
1011 );
1012 matching.map(|e| vec![e.id]).unwrap_or_default()
1013 }
1014 }
1015 } else {
1016 matching.map(|e| vec![e.id]).unwrap_or_default()
1018 };
1019
1020 let new_id = self
1021 .store
1022 .insert_edge_typed(
1023 source_id,
1024 target_id,
1025 &normalized_relation,
1026 &normalized_fact,
1027 confidence,
1028 episode_id,
1029 edge_type,
1030 )
1031 .await?;
1032
1033 for old_id in superseded_ids {
1035 if belief_revision.is_some() {
1036 self.store
1037 .invalidate_edge_with_supersession(old_id, new_id)
1038 .await?;
1039 } else {
1040 self.store.invalidate_edge(old_id).await?;
1041 }
1042 }
1043
1044 Ok(Some(new_id))
1045 }
1046}
1047
1048fn extract_json(s: &str) -> &str {
1050 let trimmed = s.trim();
1051 if let Some(inner) = trimmed.strip_prefix("```json")
1053 && let Some(end) = inner.rfind("```")
1054 {
1055 return inner[..end].trim();
1056 }
1057 if let Some(inner) = trimmed.strip_prefix("```")
1058 && let Some(end) = inner.rfind("```")
1059 {
1060 return inner[..end].trim();
1061 }
1062 if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
1064 && start <= end
1065 {
1066 return &trimmed[start..=end];
1067 }
1068 trimmed
1069}
1070
1071#[cfg(test)]
1072mod tests;