1use std::sync::Arc;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use std::sync::atomic::Ordering;
9use tokio_util::sync::CancellationToken;
10use zeph_db::DbPool;
11
12pub use zeph_common::config::memory::NoteLinkingConfig;
13use zeph_common::sanitize::strip_control_chars;
14use zeph_common::text::truncate_to_bytes_ref;
15use zeph_llm::any::AnyProvider;
16use zeph_llm::provider::LlmProvider as _;
17
18use crate::embedding_store::EmbeddingStore;
19use crate::error::MemoryError;
20use crate::graph::extractor::ExtractionResult as ExtractorResult;
21use crate::vector_store::VectorFilter;
22
23use super::SemanticMemory;
24
25pub type PostExtractValidator = Option<Box<dyn Fn(&ExtractorResult) -> Result<(), String> + Send>>;
30
31#[derive(Debug, Clone)]
36pub struct GraphExtractionConfig {
37 pub max_entities: usize,
38 pub max_edges: usize,
39 pub extraction_timeout_secs: u64,
40 pub community_refresh_interval: usize,
41 pub expired_edge_retention_days: u32,
42 pub max_entities_cap: usize,
43 pub community_summary_max_prompt_bytes: usize,
44 pub community_summary_concurrency: usize,
45 pub lpa_edge_chunk_size: usize,
46 pub note_linking: NoteLinkingConfig,
48 pub link_weight_decay_lambda: f64,
50 pub link_weight_decay_interval_secs: u64,
52 pub belief_revision_enabled: bool,
54 pub belief_revision_similarity_threshold: f32,
56 pub conversation_id: Option<i64>,
59 pub apex_mem_enabled: bool,
61 pub llm_timeout_secs: u64,
63 pub embed_timeout_secs: u64,
65 pub turn_index: Option<u32>,
69 pub write_gate_min_relevance: Option<f32>,
73 pub benna_fast_rate: f32,
77 pub benna_slow_rate: f32,
81}
82
83impl Default for GraphExtractionConfig {
84 fn default() -> Self {
85 Self {
86 max_entities: 0,
87 max_edges: 0,
88 extraction_timeout_secs: 0,
89 community_refresh_interval: 0,
90 expired_edge_retention_days: 0,
91 max_entities_cap: 0,
92 community_summary_max_prompt_bytes: 0,
93 community_summary_concurrency: 0,
94 lpa_edge_chunk_size: 0,
95 note_linking: NoteLinkingConfig::default(),
96 link_weight_decay_lambda: 0.95,
97 link_weight_decay_interval_secs: 86400,
98 belief_revision_enabled: false,
99 belief_revision_similarity_threshold: 0.85,
100 conversation_id: None,
101 apex_mem_enabled: false,
102 llm_timeout_secs: 30,
103 embed_timeout_secs: 5,
104 turn_index: None,
105 write_gate_min_relevance: None,
106 benna_fast_rate: 0.5,
107 benna_slow_rate: 0.05,
108 }
109 }
110}
111
112#[derive(Debug, Default)]
114pub struct ExtractionStats {
115 pub entities_upserted: usize,
116 pub edges_inserted: usize,
117}
118
119#[derive(Debug, Default)]
121pub struct ExtractionResult {
122 pub stats: ExtractionStats,
123 pub entity_ids: Vec<i64>,
125}
126
127#[derive(Debug, Default)]
129pub struct LinkingStats {
130 pub entities_processed: usize,
131 pub edges_created: usize,
132}
133
134const ENTITY_COLLECTION: &str = "zeph_graph_entities";
136
137const MAX_RELATION_BYTES: usize = 256;
139const MAX_FACT_BYTES: usize = 2048;
141const DEFAULT_EDGE_CONFIDENCE: f32 = 0.8;
143
144struct EntityWorkItem {
146 entity_id: i64,
147 canonical_name: String,
148 embed_text: String,
149 self_point_id: Option<String>,
150}
151
152pub async fn link_memory_notes(
168 entity_ids: &[i64],
169 pool: DbPool,
170 embedding_store: Arc<EmbeddingStore>,
171 provider: AnyProvider,
172 cfg: &NoteLinkingConfig,
173) -> LinkingStats {
174 use crate::graph::GraphStore;
175
176 let store = GraphStore::new(pool);
177 let mut stats = LinkingStats::default();
178
179 let work_items = collect_note_link_work_items(entity_ids, &store).await;
180 if work_items.is_empty() {
181 return stats;
182 }
183
184 let valid = embed_work_items(&work_items, &provider, cfg).await;
185
186 let search_limit = cfg.top_k + 1; let search_results = search_similar_for_items(&valid, &embedding_store, search_limit).await;
188
189 insert_similarity_edges(
190 &work_items,
191 &valid,
192 &search_results,
193 cfg,
194 &store,
195 &mut stats,
196 )
197 .await;
198
199 stats
200}
201
202async fn collect_note_link_work_items(
206 entity_ids: &[i64],
207 store: &crate::graph::GraphStore,
208) -> Vec<EntityWorkItem> {
209 let mut work_items: Vec<EntityWorkItem> = Vec::with_capacity(entity_ids.len());
210 for &entity_id in entity_ids {
211 let entity = match store.find_entity_by_id(entity_id).await {
212 Ok(Some(e)) => e,
213 Ok(None) => {
214 tracing::debug!("note_linking: entity {entity_id} not found, skipping");
215 continue;
216 }
217 Err(e) => {
218 tracing::debug!("note_linking: DB error loading entity {entity_id}: {e:#}");
219 continue;
220 }
221 };
222 let embed_text = match &entity.summary {
223 Some(s) if !s.is_empty() => format!("{}: {s}", entity.canonical_name),
224 _ => entity.canonical_name.clone(),
225 };
226 work_items.push(EntityWorkItem {
227 entity_id,
228 canonical_name: entity.canonical_name,
229 embed_text,
230 self_point_id: entity.qdrant_point_id,
231 });
232 }
233 work_items
234}
235
236async fn embed_work_items(
241 work_items: &[EntityWorkItem],
242 provider: &AnyProvider,
243 cfg: &NoteLinkingConfig,
244) -> Vec<(usize, Vec<f32>)> {
245 use futures::future;
246
247 let Ok(embed_results) = tokio::time::timeout(
248 std::time::Duration::from_secs(cfg.timeout_secs),
249 future::join_all(work_items.iter().map(|w| provider.embed(&w.embed_text))),
250 )
251 .await
252 else {
253 tracing::warn!(
254 count = work_items.len(),
255 "note_linking: batch embed timed out — skipping all entities"
256 );
257 return Vec::new();
258 };
259 embed_results
260 .into_iter()
261 .enumerate()
262 .filter_map(|(i, r)| match r {
263 Ok(v) => Some((i, v)),
264 Err(e) => {
265 tracing::debug!(
266 "note_linking: embed failed for entity {:?}: {e:#}",
267 work_items[i].canonical_name
268 );
269 None
270 }
271 })
272 .collect()
273}
274
275async fn search_similar_for_items(
277 valid: &[(usize, Vec<f32>)],
278 embedding_store: &EmbeddingStore,
279 search_limit: usize,
280) -> Vec<Result<Vec<crate::ScoredVectorPoint>, MemoryError>> {
281 use futures::future;
282
283 future::join_all(valid.iter().map(|(_, vec)| {
284 embedding_store.search_collection(
285 ENTITY_COLLECTION,
286 vec,
287 search_limit,
288 None::<VectorFilter>,
289 )
290 }))
291 .await
292}
293
294async fn insert_similarity_edges(
299 work_items: &[EntityWorkItem],
300 valid: &[(usize, Vec<f32>)],
301 search_results: &[Result<Vec<crate::ScoredVectorPoint>, MemoryError>],
302 cfg: &NoteLinkingConfig,
303 store: &crate::graph::GraphStore,
304 stats: &mut LinkingStats,
305) {
306 let mut seen_pairs = std::collections::HashSet::new();
307
308 for ((work_idx, _), search_result) in valid.iter().zip(search_results.iter()) {
309 let w = &work_items[*work_idx];
310
311 let results = match search_result {
312 Ok(r) => r,
313 Err(e) => {
314 tracing::debug!(
315 "note_linking: search failed for entity {:?}: {e:#}",
316 w.canonical_name
317 );
318 continue;
319 }
320 };
321
322 stats.entities_processed += 1;
323
324 let self_point_id = w.self_point_id.as_deref();
325 let candidates = results
326 .iter()
327 .filter(|p| Some(p.id.as_str()) != self_point_id && p.score >= cfg.similarity_threshold)
328 .take(cfg.top_k);
329
330 for point in candidates {
331 let Some(target_id) = point
332 .payload
333 .get("entity_id")
334 .and_then(serde_json::Value::as_i64)
335 else {
336 tracing::debug!(
337 "note_linking: missing entity_id in payload for point {}",
338 point.id
339 );
340 continue;
341 };
342
343 if target_id == w.entity_id {
344 continue; }
346
347 let (src, tgt) = if w.entity_id < target_id {
349 (w.entity_id, target_id)
350 } else {
351 (target_id, w.entity_id)
352 };
353
354 if !seen_pairs.insert((src, tgt)) {
355 continue;
356 }
357
358 let fact = format!("Semantically similar entities (score: {:.3})", point.score);
359
360 match store
361 .insert_edge(src, tgt, "similar_to", &fact, point.score, None)
362 .await
363 {
364 Ok(_) => stats.edges_created += 1,
365 Err(e) => {
366 tracing::debug!("note_linking: insert_edge failed: {e:#}");
367 }
368 }
369 }
370 }
371}
372
373#[cfg_attr(
384 feature = "profiling",
385 tracing::instrument(name = "memory.graph_extract", skip_all, fields(entities = tracing::field::Empty, edges = tracing::field::Empty))
386)]
387pub async fn extract_and_store(
388 content: String,
389 context_messages: Vec<String>,
390 provider: AnyProvider,
391 pool: DbPool,
392 config: GraphExtractionConfig,
393 post_extract_validator: PostExtractValidator,
394 embedding_store: Option<Arc<EmbeddingStore>>,
395) -> Result<ExtractionResult, MemoryError> {
396 use crate::graph::{EntityResolver, GraphExtractor, GraphStore};
397
398 let extractor = GraphExtractor::new(
399 provider.clone(),
400 config.max_entities,
401 config.max_edges,
402 config.llm_timeout_secs,
403 );
404 let ctx_refs: Vec<&str> = context_messages.iter().map(String::as_str).collect();
405
406 let store =
407 GraphStore::new(pool).with_benna_rates(config.benna_fast_rate, config.benna_slow_rate);
408
409 bump_extraction_count(store.pool()).await?;
410
411 let Some(result) = extractor.extract(&content, &ctx_refs).await? else {
412 return Ok(ExtractionResult::default());
413 };
414
415 if let Some(ref validator) = post_extract_validator
418 && let Err(reason) = validator(&result)
419 {
420 tracing::warn!(
421 reason,
422 "graph extraction validation failed, skipping upsert"
423 );
424 return Ok(ExtractionResult::default());
425 }
426
427 let resolver = if let Some(ref emb) = embedding_store {
428 EntityResolver::new(&store)
429 .with_embedding_store(emb)
430 .with_provider(&provider)
431 .with_embed_timeout(config.embed_timeout_secs)
432 } else {
433 EntityResolver::new(&store).with_embed_timeout(config.embed_timeout_secs)
434 };
435
436 let (entity_name_to_id, entities_upserted) = upsert_entities(&resolver, &result.entities).await;
437 let edges_inserted = insert_edges(&resolver, &result.edges, &entity_name_to_id, &config).await;
438
439 #[cfg(any(feature = "sqlite", feature = "postgres"))]
440 store.checkpoint_wal().await?;
441
442 let new_entity_ids: Vec<i64> = entity_name_to_id.into_values().collect();
443
444 link_episode(&store, &config, &new_entity_ids).await;
445
446 #[cfg(feature = "profiling")]
447 {
448 let span = tracing::Span::current();
449 span.record("entities", entities_upserted);
450 span.record("edges", edges_inserted);
451 }
452
453 Ok(ExtractionResult {
454 stats: ExtractionStats {
455 entities_upserted,
456 edges_inserted,
457 },
458 entity_ids: new_entity_ids,
459 })
460}
461
462async fn bump_extraction_count(pool: &DbPool) -> Result<(), MemoryError> {
464 zeph_db::query(sql!(
465 "INSERT INTO graph_metadata (key, value) VALUES ('extraction_count', '0')
466 ON CONFLICT(key) DO NOTHING"
467 ))
468 .execute(pool)
469 .await?;
470 zeph_db::query(sql!(
471 "UPDATE graph_metadata
472 SET value = CAST(CAST(value AS INTEGER) + 1 AS TEXT)
473 WHERE key = 'extraction_count'"
474 ))
475 .execute(pool)
476 .await?;
477 Ok(())
478}
479
480async fn upsert_entities(
482 resolver: &crate::graph::EntityResolver<'_>,
483 entities: &[crate::graph::extractor::ExtractedEntity],
484) -> (std::collections::HashMap<String, i64>, usize) {
485 let mut entity_name_to_id: std::collections::HashMap<String, i64> =
486 std::collections::HashMap::new();
487 let mut entities_upserted = 0usize;
488
489 for entity in entities {
490 match resolver
491 .resolve(&entity.name, &entity.entity_type, entity.summary.as_deref())
492 .await
493 {
494 Ok((id, _outcome)) => {
495 entity_name_to_id.insert(entity.name.clone(), id);
496 entities_upserted += 1;
497 }
498 Err(e) => {
499 tracing::debug!("graph: skipping entity {:?}: {e:#}", entity.name);
500 }
501 }
502 }
503
504 (entity_name_to_id, entities_upserted)
505}
506
507fn is_low_signal_relation(relation: &str) -> bool {
511 const LOW_SIGNAL: &[&str] = &[
512 "related_to",
513 "related to",
514 "is related to",
515 "associated_with",
516 "associated with",
517 "has",
518 "have",
519 "is",
520 "are",
521 "mentions",
522 "mentioned",
523 "involves",
524 "involved",
525 ];
526 LOW_SIGNAL.iter().any(|&s| relation.eq_ignore_ascii_case(s))
527}
528
529#[allow(clippy::too_many_lines)]
533async fn insert_edges(
534 resolver: &crate::graph::EntityResolver<'_>,
535 edges: &[crate::graph::extractor::ExtractedEdge],
536 name_to_id: &std::collections::HashMap<String, i64>,
537 config: &GraphExtractionConfig,
538) -> usize {
539 let mut edges_inserted = 0usize;
540 for edge in edges {
541 if let Some(min_rel) = config.write_gate_min_relevance {
543 let conf = edge.confidence.unwrap_or(1.0);
544 if conf < min_rel && is_low_signal_relation(&edge.relation) {
545 tracing::debug!(
546 relation = %edge.relation,
547 confidence = conf,
548 threshold = min_rel,
549 "write-gate: skipping low-signal edge"
550 );
551 continue;
552 }
553 }
554 let (Some(&src_id), Some(&tgt_id)) =
555 (name_to_id.get(&edge.source), name_to_id.get(&edge.target))
556 else {
557 tracing::debug!(
558 "graph: skipping edge {:?}->{:?}: entity not resolved",
559 edge.source,
560 edge.target
561 );
562 continue;
563 };
564 if src_id == tgt_id {
565 tracing::debug!(
566 "graph: skipping self-loop edge {:?}->{:?} (entity_id={src_id})",
567 edge.source,
568 edge.target
569 );
570 continue;
571 }
572 let edge_type = edge
575 .edge_type
576 .parse::<crate::graph::EdgeType>()
577 .unwrap_or_else(|_| {
578 tracing::warn!(
579 raw_type = %edge.edge_type,
580 "graph: unknown edge_type from LLM, defaulting to semantic"
581 );
582 crate::graph::EdgeType::Semantic
583 });
584 if config.apex_mem_enabled {
585 let relation_trimmed = edge.relation.trim();
587 let relation_display_clean = strip_control_chars(relation_trimmed);
588 let relation_display =
589 truncate_to_bytes_ref(&relation_display_clean, MAX_RELATION_BYTES).to_owned();
590 let canonical_clean = strip_control_chars(&relation_trimmed.to_lowercase());
591 let canonical_relation =
592 truncate_to_bytes_ref(&canonical_clean, MAX_RELATION_BYTES).to_owned();
593 let fact_clean = strip_control_chars(edge.fact.trim());
594 let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
595 match resolver
596 .graph_store()
597 .insert_or_supersede_with_turn_index_and_metrics(
598 src_id,
599 tgt_id,
600 &relation_display,
601 &canonical_relation,
602 &normalized_fact,
603 edge.confidence.unwrap_or(DEFAULT_EDGE_CONFIDENCE),
604 None,
605 edge_type,
606 true,
607 None,
608 config.turn_index,
609 )
610 .await
611 {
612 Ok(_) => edges_inserted += 1,
613 Err(e) => {
614 tracing::debug!("graph: skipping edge (apex): {e:#}");
615 }
616 }
617 } else {
618 let belief_cfg =
619 config
620 .belief_revision_enabled
621 .then_some(crate::graph::BeliefRevisionConfig {
622 similarity_threshold: config.belief_revision_similarity_threshold,
623 });
624 match resolver
625 .resolve_edge_typed(
626 src_id,
627 tgt_id,
628 &edge.relation,
629 &edge.fact,
630 edge.confidence.unwrap_or(DEFAULT_EDGE_CONFIDENCE),
631 None,
632 edge_type,
633 belief_cfg.as_ref(),
634 )
635 .await
636 {
637 Ok(Some(_)) => edges_inserted += 1,
638 Ok(None) => {} Err(e) => {
640 tracing::debug!("graph: skipping edge: {e:#}");
641 }
642 }
643 }
644 }
645 edges_inserted
646}
647
648async fn link_episode(
650 store: &crate::graph::GraphStore,
651 config: &GraphExtractionConfig,
652 entity_ids: &[i64],
653) {
654 let Some(conv_id) = config.conversation_id else {
655 return;
656 };
657 match store.ensure_episode(conv_id).await {
658 Ok(episode_id) => {
659 for &entity_id in entity_ids {
660 if let Err(e) = store.link_entity_to_episode(episode_id, entity_id).await {
661 tracing::debug!("episode linking skipped for entity {entity_id}: {e:#}");
662 }
663 }
664 }
665 Err(e) => {
666 tracing::warn!("failed to ensure episode for conversation {conv_id}: {e:#}");
667 }
668 }
669}
670
671impl SemanticMemory {
672 pub fn spawn_graph_extraction(
689 &self,
690 content: String,
691 context_messages: Vec<String>,
692 config: GraphExtractionConfig,
693 post_extract_validator: PostExtractValidator,
694 provider_override: Option<AnyProvider>,
695 cancel: CancellationToken,
696 ) -> tokio::task::JoinHandle<()> {
697 let using_override = provider_override.is_some();
698 let provider = provider_override.unwrap_or_else(|| self.provider.clone());
699 if using_override {
700 tracing::debug!(
701 extract_provider = provider.name(),
702 "graph extraction using override provider (quality_gate bypassed)"
703 );
704 }
705 *self
706 .graph_cancel
707 .lock()
708 .expect("graph_cancel mutex poisoned") = Some(cancel.clone());
709
710 let ctx = GraphExtractionTaskCtx {
711 pool: self.sqlite.pool().clone(),
712 provider,
713 failure_counter: self.community_detection_failures.clone(),
714 extraction_count: self.graph_extraction_count.clone(),
715 extraction_failures: self.graph_extraction_failures.clone(),
716 embedding_store: self.qdrant.clone(),
717 cancel,
718 };
719
720 tokio::spawn(run_graph_extraction_task(
721 content,
722 context_messages,
723 config,
724 post_extract_validator,
725 ctx,
726 ))
727 }
728
729 pub fn cancel_graph_extraction(&self) {
745 if let Some(token) = self
746 .graph_cancel
747 .lock()
748 .expect("graph_cancel mutex poisoned")
749 .as_ref()
750 {
751 token.cancel();
752 }
753 }
754}
755
756struct GraphExtractionTaskCtx {
760 pool: DbPool,
761 provider: AnyProvider,
762 failure_counter: Arc<std::sync::atomic::AtomicU64>,
763 extraction_count: Arc<std::sync::atomic::AtomicU64>,
764 extraction_failures: Arc<std::sync::atomic::AtomicU64>,
765 embedding_store: Option<Arc<EmbeddingStore>>,
766 cancel: CancellationToken,
768}
769
770async fn run_graph_extraction_task(
772 content: String,
773 context_messages: Vec<String>,
774 config: GraphExtractionConfig,
775 post_extract_validator: PostExtractValidator,
776 ctx: GraphExtractionTaskCtx,
777) {
778 let timeout_dur = std::time::Duration::from_secs(config.extraction_timeout_secs);
779 let extraction_result = tokio::time::timeout(
780 timeout_dur,
781 extract_and_store(
782 content,
783 context_messages,
784 ctx.provider.clone(),
785 ctx.pool.clone(),
786 config.clone(),
787 post_extract_validator,
788 ctx.embedding_store.clone(),
789 ),
790 )
791 .await;
792
793 let (extraction_ok, new_entity_ids) = match extraction_result {
794 Ok(Ok(result)) => {
795 tracing::debug!(
796 entities = result.stats.entities_upserted,
797 edges = result.stats.edges_inserted,
798 "graph extraction completed"
799 );
800 ctx.extraction_count.fetch_add(1, Ordering::Relaxed);
801 (true, result.entity_ids)
802 }
803 Ok(Err(e)) => {
804 tracing::warn!("graph extraction failed: {e:#}");
805 ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
806 (false, vec![])
807 }
808 Err(_elapsed) => {
809 tracing::warn!("graph extraction timed out");
810 ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
811 (false, vec![])
812 }
813 };
814
815 run_note_linking(
816 extraction_ok,
817 &new_entity_ids,
818 ctx.pool.clone(),
819 ctx.embedding_store,
820 ctx.provider.clone(),
821 &config,
822 )
823 .await;
824
825 maybe_refresh_communities(
826 extraction_ok,
827 ctx.pool,
828 ctx.provider,
829 ctx.failure_counter,
830 &config,
831 ctx.cancel,
832 )
833 .await;
834}
835
836async fn run_note_linking(
838 extraction_ok: bool,
839 new_entity_ids: &[i64],
840 pool: DbPool,
841 embedding_store: Option<Arc<EmbeddingStore>>,
842 provider: AnyProvider,
843 config: &GraphExtractionConfig,
844) {
845 if !extraction_ok || !config.note_linking.enabled || new_entity_ids.is_empty() {
846 return;
847 }
848 let Some(store) = embedding_store else {
849 return;
850 };
851 let linking_timeout = std::time::Duration::from_secs(config.note_linking.timeout_secs);
852 match tokio::time::timeout(
853 linking_timeout,
854 link_memory_notes(new_entity_ids, pool, store, provider, &config.note_linking),
855 )
856 .await
857 {
858 Ok(stats) => {
859 tracing::debug!(
860 entities_processed = stats.entities_processed,
861 edges_created = stats.edges_created,
862 "note linking completed"
863 );
864 }
865 Err(_elapsed) => {
866 tracing::debug!("note linking timed out (partial edges may exist)");
867 }
868 }
869}
870
871async fn maybe_refresh_communities(
878 extraction_ok: bool,
879 pool: DbPool,
880 provider: AnyProvider,
881 failure_counter: Arc<std::sync::atomic::AtomicU64>,
882 config: &GraphExtractionConfig,
883 cancel: CancellationToken,
884) {
885 use crate::graph::GraphStore;
886
887 if !extraction_ok || config.community_refresh_interval == 0 {
888 return;
889 }
890
891 let store = GraphStore::new(pool.clone());
892 let extraction_count = store.extraction_count().await.unwrap_or(0);
893 if extraction_count == 0
894 || !i64::try_from(config.community_refresh_interval)
895 .is_ok_and(|interval| extraction_count % interval == 0)
896 {
897 return;
898 }
899
900 tracing::info!(extraction_count, "triggering community detection refresh");
901 let store2 = GraphStore::new(pool);
902 let retention_days = config.expired_edge_retention_days;
903 let max_cap = config.max_entities_cap;
904 let max_prompt_bytes = config.community_summary_max_prompt_bytes;
905 let concurrency = config.community_summary_concurrency;
906 let edge_chunk_size = config.lpa_edge_chunk_size;
907 let decay_lambda = config.link_weight_decay_lambda;
908 let decay_interval_secs = config.link_weight_decay_interval_secs;
909
910 tokio::select! {
911 () = cancel.cancelled() => {
912 tracing::debug!("community refresh cancelled before community detection");
913 return;
914 }
915 result = crate::graph::community::detect_communities(
916 &store2,
917 &provider,
918 max_prompt_bytes,
919 concurrency,
920 edge_chunk_size,
921 ) => {
922 match result {
923 Ok(count) => {
924 tracing::info!(communities = count, "community detection complete");
925 }
926 Err(e) => {
927 tracing::warn!("community detection failed: {e:#}");
928 failure_counter.fetch_add(1, Ordering::Relaxed);
929 }
930 }
931 }
932 }
933
934 tokio::select! {
935 () = cancel.cancelled() => {
936 tracing::debug!("community refresh cancelled before graph eviction");
937 return;
938 }
939 result = crate::graph::community::run_graph_eviction(&store2, retention_days, max_cap) => {
940 match result {
941 Ok(stats) => {
942 tracing::info!(
943 expired_edges = stats.expired_edges_deleted,
944 orphan_entities = stats.orphan_entities_deleted,
945 capped_entities = stats.capped_entities_deleted,
946 "graph eviction complete"
947 );
948 }
949 Err(e) => {
950 tracing::warn!("graph eviction failed: {e:#}");
951 }
952 }
953 }
954 }
955
956 if decay_lambda > 0.0 && decay_interval_secs > 0 {
958 let now_secs = std::time::SystemTime::now()
959 .duration_since(std::time::UNIX_EPOCH)
960 .map_or(0, |d| d.as_secs());
961 let last_decay = store2
962 .get_metadata("last_link_weight_decay_at")
963 .await
964 .ok()
965 .flatten()
966 .and_then(|s| s.parse::<u64>().ok())
967 .unwrap_or(0);
968 if now_secs.saturating_sub(last_decay) >= decay_interval_secs {
969 tokio::select! {
970 () = cancel.cancelled() => {
971 tracing::debug!("community refresh cancelled before link weight decay");
972 }
973 result = store2.decay_edge_retrieval_counts(decay_lambda, decay_interval_secs) => {
974 match result {
975 Ok(affected) => {
976 tracing::info!(affected, "link weight decay applied");
977 let _ = store2
978 .set_metadata("last_link_weight_decay_at", &now_secs.to_string())
979 .await;
980 }
981 Err(e) => {
982 tracing::warn!("link weight decay failed: {e:#}");
983 }
984 }
985 }
986 }
987 }
988 }
989}
990
991#[cfg(test)]
992mod tests {
993 use std::sync::Arc;
994
995 use zeph_llm::any::AnyProvider;
996
997 use super::{NoteLinkingConfig, extract_and_store};
998 use crate::embedding_store::EmbeddingStore;
999 use crate::graph::GraphStore;
1000 use crate::in_memory_store::InMemoryVectorStore;
1001 use crate::store::SqliteStore;
1002
1003 use super::GraphExtractionConfig;
1004
1005 async fn setup() -> (GraphStore, Arc<EmbeddingStore>) {
1006 let sqlite = SqliteStore::new(":memory:").await.unwrap();
1007 let pool = sqlite.pool().clone();
1008 let mem_store = Box::new(InMemoryVectorStore::new());
1009 let emb = Arc::new(EmbeddingStore::with_store(mem_store, pool.clone()));
1010 let gs = GraphStore::new(pool);
1011 (gs, emb)
1012 }
1013
1014 #[tokio::test]
1017 async fn extract_and_store_sets_qdrant_point_id_when_embedding_store_provided() {
1018 let (gs, emb) = setup().await;
1019
1020 let extraction_json = r#"{"entities":[{"name":"Rust","type":"language","summary":"systems language"}],"edges":[]}"#;
1022 let mut mock =
1023 zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1024 mock.supports_embeddings = true;
1025 mock.embedding = vec![1.0_f32, 0.0, 0.0, 0.0];
1026 let provider = AnyProvider::Mock(mock);
1027
1028 let config = GraphExtractionConfig {
1029 max_entities: 10,
1030 max_edges: 10,
1031 extraction_timeout_secs: 10,
1032 ..Default::default()
1033 };
1034
1035 let result = extract_and_store(
1036 "Rust is a systems programming language.".to_owned(),
1037 vec![],
1038 provider,
1039 gs.pool().clone(),
1040 config,
1041 None,
1042 Some(emb.clone()),
1043 )
1044 .await
1045 .unwrap();
1046
1047 assert_eq!(
1048 result.stats.entities_upserted, 1,
1049 "one entity should be upserted"
1050 );
1051
1052 let entity = gs
1056 .find_entity("rust", crate::graph::EntityType::Language)
1057 .await
1058 .unwrap()
1059 .expect("entity 'rust' must exist in SQLite");
1060
1061 assert!(
1062 entity.qdrant_point_id.is_some(),
1063 "qdrant_point_id must be set when embedding_store + provider are both provided (regression for #1829)"
1064 );
1065 }
1066
1067 #[tokio::test]
1070 async fn extract_and_store_without_embedding_store_still_upserts_entities() {
1071 let (gs, _emb) = setup().await;
1072
1073 let extraction_json = r#"{"entities":[{"name":"Python","type":"language","summary":"scripting"}],"edges":[]}"#;
1074 let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1075 let provider = AnyProvider::Mock(mock);
1076
1077 let config = GraphExtractionConfig {
1078 max_entities: 10,
1079 max_edges: 10,
1080 extraction_timeout_secs: 10,
1081 ..Default::default()
1082 };
1083
1084 let result = extract_and_store(
1085 "Python is a scripting language.".to_owned(),
1086 vec![],
1087 provider,
1088 gs.pool().clone(),
1089 config,
1090 None,
1091 None, )
1093 .await
1094 .unwrap();
1095
1096 assert_eq!(result.stats.entities_upserted, 1);
1097
1098 let entity = gs
1099 .find_entity("python", crate::graph::EntityType::Language)
1100 .await
1101 .unwrap()
1102 .expect("entity 'python' must exist");
1103
1104 assert!(
1105 entity.qdrant_point_id.is_none(),
1106 "qdrant_point_id must remain None when no embedding_store is provided"
1107 );
1108 }
1109
1110 #[tokio::test]
1114 async fn extract_and_store_fts5_cross_session_visibility() {
1115 let file = tempfile::NamedTempFile::new().expect("tempfile");
1116 let path = file.path().to_str().expect("valid path").to_string();
1117
1118 {
1120 let sqlite = crate::store::SqliteStore::new(&path).await.unwrap();
1121 let extraction_json = r#"{"entities":[{"name":"Ferris","type":"concept","summary":"Rust mascot"}],"edges":[]}"#;
1122 let mock =
1123 zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1124 let provider = AnyProvider::Mock(mock);
1125 let config = GraphExtractionConfig {
1126 max_entities: 10,
1127 max_edges: 10,
1128 extraction_timeout_secs: 10,
1129 ..Default::default()
1130 };
1131 extract_and_store(
1132 "Ferris is the Rust mascot.".to_owned(),
1133 vec![],
1134 provider,
1135 sqlite.pool().clone(),
1136 config,
1137 None,
1138 None,
1139 )
1140 .await
1141 .unwrap();
1142 }
1143
1144 let sqlite_b = crate::store::SqliteStore::new(&path).await.unwrap();
1146 let gs_b = crate::graph::GraphStore::new(sqlite_b.pool().clone());
1147 let results = gs_b.find_entities_fuzzy("Ferris", 10).await.unwrap();
1148 assert!(
1149 !results.is_empty(),
1150 "FTS5 cross-session (#2166): entity extracted in session A must be visible in session B"
1151 );
1152 }
1153
1154 #[tokio::test]
1157 async fn extract_and_store_skips_self_loop_edges() {
1158 let (gs, _emb) = setup().await;
1159
1160 let extraction_json = r#"{
1162 "entities":[{"name":"Rust","type":"language","summary":"systems language"}],
1163 "edges":[{"source":"Rust","target":"Rust","relation":"is","fact":"Rust is Rust","edge_type":"semantic"}]
1164 }"#;
1165 let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1166 let provider = AnyProvider::Mock(mock);
1167
1168 let config = GraphExtractionConfig {
1169 max_entities: 10,
1170 max_edges: 10,
1171 extraction_timeout_secs: 10,
1172 ..Default::default()
1173 };
1174
1175 let result = extract_and_store(
1176 "Rust is a language.".to_owned(),
1177 vec![],
1178 provider,
1179 gs.pool().clone(),
1180 config,
1181 None,
1182 None,
1183 )
1184 .await
1185 .unwrap();
1186
1187 assert_eq!(result.stats.entities_upserted, 1);
1188 assert_eq!(
1189 result.stats.edges_inserted, 0,
1190 "self-loop edge must be rejected (#2215)"
1191 );
1192 }
1193
1194 #[tokio::test]
1199 async fn apex_mem_path_inserts_edge_via_insert_or_supersede() {
1200 let (gs, _emb) = setup().await;
1201
1202 let extraction_json = r#"{
1203 "entities":[
1204 {"name":"Alice","type":"person","summary":"a person"},
1205 {"name":"Bob","type":"person","summary":"another person"}
1206 ],
1207 "edges":[
1208 {"source":"Alice","target":"Bob","relation":"KNOWS","fact":"Alice knows Bob","edge_type":"semantic"}
1209 ]
1210 }"#;
1211 let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1212 let provider = AnyProvider::Mock(mock);
1213
1214 let config = GraphExtractionConfig {
1215 max_entities: 10,
1216 max_edges: 10,
1217 extraction_timeout_secs: 10,
1218 apex_mem_enabled: true,
1219 ..Default::default()
1220 };
1221
1222 let result = extract_and_store(
1223 "Alice knows Bob.".to_owned(),
1224 vec![],
1225 provider,
1226 gs.pool().clone(),
1227 config,
1228 None,
1229 None,
1230 )
1231 .await
1232 .unwrap();
1233
1234 assert_eq!(result.stats.entities_upserted, 2, "two entities expected");
1235 assert_eq!(
1236 result.stats.edges_inserted, 1,
1237 "APEX-MEM path must insert the edge and count it (#3631)"
1238 );
1239
1240 let alice_id = gs
1242 .find_entity("alice", crate::graph::EntityType::Person)
1243 .await
1244 .unwrap()
1245 .expect("entity 'alice' must exist")
1246 .id
1247 .0;
1248 let bob_id = gs
1249 .find_entity("bob", crate::graph::EntityType::Person)
1250 .await
1251 .unwrap()
1252 .expect("entity 'bob' must exist")
1253 .id
1254 .0;
1255 let edges = gs.edges_exact(alice_id, bob_id).await.unwrap();
1256 assert_eq!(edges.len(), 1, "exactly one edge expected");
1257 assert_eq!(
1259 edges[0].relation, "KNOWS",
1260 "display relation must preserve original casing"
1261 );
1262 }
1263
1264 #[tokio::test]
1267 async fn embed_work_items_timeout_returns_empty() {
1268 use zeph_llm::mock::MockProvider;
1269
1270 tokio::time::pause();
1273
1274 let mut mock = MockProvider::default();
1276 mock.supports_embeddings = true;
1277 mock.embed_delay_ms = 31_000;
1278 let provider = AnyProvider::Mock(mock);
1279
1280 let work_items = vec![super::EntityWorkItem {
1281 entity_id: 1,
1282 canonical_name: "Alice".to_owned(),
1283 embed_text: "Alice".to_owned(),
1284 self_point_id: None,
1285 }];
1286
1287 let cfg = NoteLinkingConfig {
1288 timeout_secs: 30,
1289 ..NoteLinkingConfig::default()
1290 };
1291 let result = super::embed_work_items(&work_items, &provider, &cfg).await;
1292 assert!(
1293 result.is_empty(),
1294 "embed_work_items must return empty Vec on 30 s timeout (fail-open)"
1295 );
1296 }
1297
1298 #[tokio::test]
1305 async fn maybe_refresh_communities_respects_cancelled_token() {
1306 use tokio_util::sync::CancellationToken;
1307
1308 use crate::graph::GraphStore;
1309 use crate::store::SqliteStore;
1310
1311 let sqlite = SqliteStore::new(":memory:").await.unwrap();
1312 let pool = sqlite.pool().clone();
1313 let gs = GraphStore::new(pool.clone());
1314
1315 gs.set_metadata("extraction_count", "1").await.unwrap();
1317
1318 let config = GraphExtractionConfig {
1319 community_refresh_interval: 1,
1320 ..Default::default()
1321 };
1322
1323 let cancel = CancellationToken::new();
1324 cancel.cancel(); let extraction_json = r#"{"entities":[],"edges":[]}"#;
1327 let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1328 let provider = AnyProvider::Mock(mock);
1329
1330 let failure_counter = Arc::new(std::sync::atomic::AtomicU64::new(0));
1331
1332 super::maybe_refresh_communities(
1335 true,
1336 pool,
1337 provider,
1338 failure_counter.clone(),
1339 &config,
1340 cancel,
1341 )
1342 .await;
1343
1344 assert_eq!(
1345 failure_counter.load(std::sync::atomic::Ordering::Relaxed),
1346 0,
1347 "no failures should be recorded when cancelled before any detection step"
1348 );
1349 }
1350
1351 #[test]
1352 fn is_low_signal_known_values() {
1353 assert!(
1354 super::is_low_signal_relation("related_to"),
1355 "related_to must be low-signal"
1356 );
1357 assert!(
1358 super::is_low_signal_relation("related to"),
1359 "related to (space) must be low-signal"
1360 );
1361 assert!(
1362 super::is_low_signal_relation("IS"),
1363 "IS (uppercase) must be low-signal (case-insensitive)"
1364 );
1365 assert!(
1366 super::is_low_signal_relation("mentions"),
1367 "mentions must be low-signal"
1368 );
1369 }
1370
1371 #[test]
1372 fn is_low_signal_specific_relations_pass() {
1373 assert!(
1374 !super::is_low_signal_relation("causes"),
1375 "causes must NOT be low-signal"
1376 );
1377 assert!(
1378 !super::is_low_signal_relation("works_at"),
1379 "works_at must NOT be low-signal"
1380 );
1381 assert!(
1382 !super::is_low_signal_relation("born_in"),
1383 "born_in must NOT be low-signal"
1384 );
1385 }
1386
1387 #[tokio::test]
1401 async fn extract_and_store_respects_configured_benna_rates() {
1402 use crate::graph::EdgeType;
1403
1404 async fn run_two_inserts(fast_rate: f32, slow_rate: f32) -> crate::graph::types::Edge {
1405 let sqlite = crate::store::SqliteStore::new(":memory:").await.unwrap();
1406 let pool = sqlite.pool().clone();
1407 let gs = GraphStore::new(pool).with_benna_rates(fast_rate, slow_rate);
1408
1409 let alice_id = gs
1410 .upsert_entity("Alice", "alice", crate::graph::EntityType::Person, None)
1411 .await
1412 .unwrap();
1413 let bob_id = gs
1414 .upsert_entity("Bob", "bob", crate::graph::EntityType::Person, None)
1415 .await
1416 .unwrap();
1417
1418 gs.insert_edge_typed(
1420 alice_id.0,
1421 bob_id.0,
1422 "knows",
1423 "Alice knows Bob",
1424 0.6,
1425 None,
1426 EdgeType::Semantic,
1427 )
1428 .await
1429 .unwrap();
1430
1431 gs.insert_edge_typed(
1433 alice_id.0,
1434 bob_id.0,
1435 "knows",
1436 "Alice knows Bob",
1437 0.8,
1438 None,
1439 EdgeType::Semantic,
1440 )
1441 .await
1442 .unwrap();
1443
1444 let mut edges = gs.edges_exact(alice_id.0, bob_id.0).await.unwrap();
1445 assert_eq!(edges.len(), 1, "exactly one active edge expected");
1446 edges.remove(0)
1447 }
1448
1449 let default_edge = run_two_inserts(0.5, 0.05).await;
1450 let custom_edge = run_two_inserts(0.1, 0.02).await;
1451
1452 assert!(
1455 (default_edge.confidence_fast - custom_edge.confidence_fast).abs() > f32::EPSILON,
1456 "confidence_fast must differ between default (0.5) and custom (0.1) benna_fast_rate (#4711)"
1457 );
1458 assert!(
1459 (default_edge.confidence_slow - custom_edge.confidence_slow).abs() > f32::EPSILON,
1460 "confidence_slow must differ between default (0.05) and custom (0.02) benna_slow_rate (#4711)"
1461 );
1462 assert!(
1464 default_edge.confidence_fast > custom_edge.confidence_fast,
1465 "higher benna_fast_rate must produce a larger confidence_fast after merge"
1466 );
1467 }
1468
1469 #[tokio::test]
1476 async fn extract_and_store_forwards_edge_confidence_not_hardcoded_08() {
1477 use crate::graph::{EntityType, GraphStore};
1478
1479 let sqlite = crate::store::SqliteStore::new(":memory:").await.unwrap();
1480 let pool = sqlite.pool().clone();
1481
1482 let extraction_json = r#"{
1484 "entities":[
1485 {"name":"Alice","type":"person","summary":"person"},
1486 {"name":"Bob","type":"person","summary":"person"}
1487 ],
1488 "edges":[{"source":"Alice","target":"Bob","relation":"knows","fact":"Alice knows Bob","edge_type":"semantic","confidence":0.3}]
1489 }"#;
1490 let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1491 let provider = AnyProvider::Mock(mock);
1492 let config = GraphExtractionConfig {
1493 max_entities: 10,
1494 max_edges: 10,
1495 extraction_timeout_secs: 10,
1496 ..Default::default()
1497 };
1498
1499 let result = extract_and_store(
1500 "Alice knows Bob.".to_owned(),
1501 vec![],
1502 provider,
1503 pool.clone(),
1504 config,
1505 None,
1506 None,
1507 )
1508 .await
1509 .unwrap();
1510
1511 assert_eq!(result.stats.edges_inserted, 1, "one edge must be inserted");
1512
1513 let gs = GraphStore::new(pool);
1514 let alice_id: i64 = gs
1515 .find_entity("alice", EntityType::Person)
1516 .await
1517 .unwrap()
1518 .expect("alice must exist")
1519 .id
1520 .0;
1521 let bob_id: i64 = gs
1522 .find_entity("bob", EntityType::Person)
1523 .await
1524 .unwrap()
1525 .expect("bob must exist")
1526 .id
1527 .0;
1528
1529 let mut edges = gs.edges_exact(alice_id, bob_id).await.unwrap();
1530 assert_eq!(edges.len(), 1, "exactly one active edge expected");
1531 let edge = edges.remove(0);
1532
1533 assert!(
1535 (edge.confidence_fast - 0.3_f32).abs() < 0.01,
1536 "confidence_fast must be ~0.3 (from ExtractedEdge.confidence), got {} (regression for #4723)",
1537 edge.confidence_fast
1538 );
1539 }
1540
1541 #[tokio::test]
1545 async fn extract_and_store_apex_forwards_edge_confidence_not_hardcoded_08() {
1546 use crate::graph::{EntityType, GraphStore};
1547
1548 let sqlite = crate::store::SqliteStore::new(":memory:").await.unwrap();
1549 let pool = sqlite.pool().clone();
1550
1551 let extraction_json = r#"{
1553 "entities":[
1554 {"name":"Alice","type":"person","summary":"person"},
1555 {"name":"Bob","type":"person","summary":"person"}
1556 ],
1557 "edges":[{"source":"Alice","target":"Bob","relation":"knows","fact":"Alice knows Bob","edge_type":"semantic","confidence":0.3}]
1558 }"#;
1559 let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1560 let provider = AnyProvider::Mock(mock);
1561 let config = GraphExtractionConfig {
1562 max_entities: 10,
1563 max_edges: 10,
1564 extraction_timeout_secs: 10,
1565 apex_mem_enabled: true,
1566 ..Default::default()
1567 };
1568
1569 let result = extract_and_store(
1570 "Alice knows Bob.".to_owned(),
1571 vec![],
1572 provider,
1573 pool.clone(),
1574 config,
1575 None,
1576 None,
1577 )
1578 .await
1579 .unwrap();
1580
1581 assert_eq!(result.stats.edges_inserted, 1, "one edge must be inserted");
1582
1583 let gs = GraphStore::new(pool);
1584 let alice_id: i64 = gs
1585 .find_entity("alice", EntityType::Person)
1586 .await
1587 .unwrap()
1588 .expect("alice must exist")
1589 .id
1590 .0;
1591 let bob_id: i64 = gs
1592 .find_entity("bob", EntityType::Person)
1593 .await
1594 .unwrap()
1595 .expect("bob must exist")
1596 .id
1597 .0;
1598
1599 let mut edges = gs.edges_exact(alice_id, bob_id).await.unwrap();
1600 assert_eq!(edges.len(), 1, "exactly one active edge expected");
1601 let edge = edges.remove(0);
1602
1603 assert!(
1605 (edge.confidence_fast - 0.3_f32).abs() < 0.01,
1606 "confidence_fast must be ~0.3 (from ExtractedEdge.confidence), got {} (regression for #4723 APEX path)",
1607 edge.confidence_fast
1608 );
1609 }
1610
1611 #[test]
1616 fn graph_extraction_config_benna_defaults_match_graph_store_defaults() {
1617 let cfg = GraphExtractionConfig::default();
1618 assert!(
1620 (cfg.benna_fast_rate - 0.5_f32).abs() < f32::EPSILON,
1621 "benna_fast_rate default must match GraphStore::new default of 0.5"
1622 );
1623 assert!(
1624 (cfg.benna_slow_rate - 0.05_f32).abs() < f32::EPSILON,
1625 "benna_slow_rate default must match GraphStore::new default of 0.05"
1626 );
1627 }
1628}