1use std::sync::Arc;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use std::sync::atomic::Ordering;
9use zeph_db::DbPool;
10
11pub use zeph_common::config::memory::NoteLinkingConfig;
12use zeph_common::sanitize::strip_control_chars;
13use zeph_common::text::truncate_to_bytes_ref;
14use zeph_llm::any::AnyProvider;
15use zeph_llm::provider::LlmProvider as _;
16
17use crate::embedding_store::EmbeddingStore;
18use crate::error::MemoryError;
19use crate::graph::extractor::ExtractionResult as ExtractorResult;
20use crate::vector_store::VectorFilter;
21
22use super::SemanticMemory;
23
24pub type PostExtractValidator = Option<Box<dyn Fn(&ExtractorResult) -> Result<(), String> + Send>>;
29
30#[derive(Debug, Clone)]
35pub struct GraphExtractionConfig {
36 pub max_entities: usize,
37 pub max_edges: usize,
38 pub extraction_timeout_secs: u64,
39 pub community_refresh_interval: usize,
40 pub expired_edge_retention_days: u32,
41 pub max_entities_cap: usize,
42 pub community_summary_max_prompt_bytes: usize,
43 pub community_summary_concurrency: usize,
44 pub lpa_edge_chunk_size: usize,
45 pub note_linking: NoteLinkingConfig,
47 pub link_weight_decay_lambda: f64,
49 pub link_weight_decay_interval_secs: u64,
51 pub belief_revision_enabled: bool,
53 pub belief_revision_similarity_threshold: f32,
55 pub conversation_id: Option<i64>,
58 pub apex_mem_enabled: bool,
60}
61
62impl Default for GraphExtractionConfig {
63 fn default() -> Self {
64 Self {
65 max_entities: 0,
66 max_edges: 0,
67 extraction_timeout_secs: 0,
68 community_refresh_interval: 0,
69 expired_edge_retention_days: 0,
70 max_entities_cap: 0,
71 community_summary_max_prompt_bytes: 0,
72 community_summary_concurrency: 0,
73 lpa_edge_chunk_size: 0,
74 note_linking: NoteLinkingConfig::default(),
75 link_weight_decay_lambda: 0.95,
76 link_weight_decay_interval_secs: 86400,
77 belief_revision_enabled: false,
78 belief_revision_similarity_threshold: 0.85,
79 conversation_id: None,
80 apex_mem_enabled: false,
81 }
82 }
83}
84
85#[derive(Debug, Default)]
87pub struct ExtractionStats {
88 pub entities_upserted: usize,
89 pub edges_inserted: usize,
90}
91
92#[derive(Debug, Default)]
94pub struct ExtractionResult {
95 pub stats: ExtractionStats,
96 pub entity_ids: Vec<i64>,
98}
99
100#[derive(Debug, Default)]
102pub struct LinkingStats {
103 pub entities_processed: usize,
104 pub edges_created: usize,
105}
106
107const ENTITY_COLLECTION: &str = "zeph_graph_entities";
109
110const MAX_RELATION_BYTES: usize = 256;
112const MAX_FACT_BYTES: usize = 2048;
114
115struct EntityWorkItem {
117 entity_id: i64,
118 canonical_name: String,
119 embed_text: String,
120 self_point_id: Option<String>,
121}
122
123pub async fn link_memory_notes(
139 entity_ids: &[i64],
140 pool: DbPool,
141 embedding_store: Arc<EmbeddingStore>,
142 provider: AnyProvider,
143 cfg: &NoteLinkingConfig,
144) -> LinkingStats {
145 use crate::graph::GraphStore;
146
147 let store = GraphStore::new(pool);
148 let mut stats = LinkingStats::default();
149
150 let work_items = collect_note_link_work_items(entity_ids, &store).await;
151 if work_items.is_empty() {
152 return stats;
153 }
154
155 let valid = embed_work_items(&work_items, &provider).await;
156
157 let search_limit = cfg.top_k + 1; let search_results = search_similar_for_items(&valid, &embedding_store, search_limit).await;
159
160 insert_similarity_edges(
161 &work_items,
162 &valid,
163 &search_results,
164 cfg,
165 &store,
166 &mut stats,
167 )
168 .await;
169
170 stats
171}
172
173async fn collect_note_link_work_items(
177 entity_ids: &[i64],
178 store: &crate::graph::GraphStore,
179) -> Vec<EntityWorkItem> {
180 let mut work_items: Vec<EntityWorkItem> = Vec::with_capacity(entity_ids.len());
181 for &entity_id in entity_ids {
182 let entity = match store.find_entity_by_id(entity_id).await {
183 Ok(Some(e)) => e,
184 Ok(None) => {
185 tracing::debug!("note_linking: entity {entity_id} not found, skipping");
186 continue;
187 }
188 Err(e) => {
189 tracing::debug!("note_linking: DB error loading entity {entity_id}: {e:#}");
190 continue;
191 }
192 };
193 let embed_text = match &entity.summary {
194 Some(s) if !s.is_empty() => format!("{}: {s}", entity.canonical_name),
195 _ => entity.canonical_name.clone(),
196 };
197 work_items.push(EntityWorkItem {
198 entity_id,
199 canonical_name: entity.canonical_name,
200 embed_text,
201 self_point_id: entity.qdrant_point_id,
202 });
203 }
204 work_items
205}
206
207async fn embed_work_items(
212 work_items: &[EntityWorkItem],
213 provider: &AnyProvider,
214) -> Vec<(usize, Vec<f32>)> {
215 use futures::future;
216
217 let embed_results: Vec<_> =
218 future::join_all(work_items.iter().map(|w| provider.embed(&w.embed_text))).await;
219
220 embed_results
221 .into_iter()
222 .enumerate()
223 .filter_map(|(i, r)| match r {
224 Ok(v) => Some((i, v)),
225 Err(e) => {
226 tracing::debug!(
227 "note_linking: embed failed for entity {:?}: {e:#}",
228 work_items[i].canonical_name
229 );
230 None
231 }
232 })
233 .collect()
234}
235
236async fn search_similar_for_items(
238 valid: &[(usize, Vec<f32>)],
239 embedding_store: &EmbeddingStore,
240 search_limit: usize,
241) -> Vec<Result<Vec<crate::ScoredVectorPoint>, MemoryError>> {
242 use futures::future;
243
244 future::join_all(valid.iter().map(|(_, vec)| {
245 embedding_store.search_collection(
246 ENTITY_COLLECTION,
247 vec,
248 search_limit,
249 None::<VectorFilter>,
250 )
251 }))
252 .await
253}
254
255async fn insert_similarity_edges(
260 work_items: &[EntityWorkItem],
261 valid: &[(usize, Vec<f32>)],
262 search_results: &[Result<Vec<crate::ScoredVectorPoint>, MemoryError>],
263 cfg: &NoteLinkingConfig,
264 store: &crate::graph::GraphStore,
265 stats: &mut LinkingStats,
266) {
267 let mut seen_pairs = std::collections::HashSet::new();
268
269 for ((work_idx, _), search_result) in valid.iter().zip(search_results.iter()) {
270 let w = &work_items[*work_idx];
271
272 let results = match search_result {
273 Ok(r) => r,
274 Err(e) => {
275 tracing::debug!(
276 "note_linking: search failed for entity {:?}: {e:#}",
277 w.canonical_name
278 );
279 continue;
280 }
281 };
282
283 stats.entities_processed += 1;
284
285 let self_point_id = w.self_point_id.as_deref();
286 let candidates = results
287 .iter()
288 .filter(|p| Some(p.id.as_str()) != self_point_id && p.score >= cfg.similarity_threshold)
289 .take(cfg.top_k);
290
291 for point in candidates {
292 let Some(target_id) = point
293 .payload
294 .get("entity_id")
295 .and_then(serde_json::Value::as_i64)
296 else {
297 tracing::debug!(
298 "note_linking: missing entity_id in payload for point {}",
299 point.id
300 );
301 continue;
302 };
303
304 if target_id == w.entity_id {
305 continue; }
307
308 let (src, tgt) = if w.entity_id < target_id {
310 (w.entity_id, target_id)
311 } else {
312 (target_id, w.entity_id)
313 };
314
315 if !seen_pairs.insert((src, tgt)) {
316 continue;
317 }
318
319 let fact = format!("Semantically similar entities (score: {:.3})", point.score);
320
321 match store
322 .insert_edge(src, tgt, "similar_to", &fact, point.score, None)
323 .await
324 {
325 Ok(_) => stats.edges_created += 1,
326 Err(e) => {
327 tracing::debug!("note_linking: insert_edge failed: {e:#}");
328 }
329 }
330 }
331 }
332}
333
334#[cfg_attr(
345 feature = "profiling",
346 tracing::instrument(name = "memory.graph_extract", skip_all, fields(entities = tracing::field::Empty, edges = tracing::field::Empty))
347)]
348pub async fn extract_and_store(
349 content: String,
350 context_messages: Vec<String>,
351 provider: AnyProvider,
352 pool: DbPool,
353 config: GraphExtractionConfig,
354 post_extract_validator: PostExtractValidator,
355 embedding_store: Option<Arc<EmbeddingStore>>,
356) -> Result<ExtractionResult, MemoryError> {
357 use crate::graph::{EntityResolver, GraphExtractor, GraphStore};
358
359 let extractor = GraphExtractor::new(provider.clone(), config.max_entities, config.max_edges);
360 let ctx_refs: Vec<&str> = context_messages.iter().map(String::as_str).collect();
361
362 let store = GraphStore::new(pool);
363
364 bump_extraction_count(store.pool()).await?;
365
366 let Some(result) = extractor.extract(&content, &ctx_refs).await? else {
367 return Ok(ExtractionResult::default());
368 };
369
370 if let Some(ref validator) = post_extract_validator
373 && let Err(reason) = validator(&result)
374 {
375 tracing::warn!(
376 reason,
377 "graph extraction validation failed, skipping upsert"
378 );
379 return Ok(ExtractionResult::default());
380 }
381
382 let resolver = if let Some(ref emb) = embedding_store {
383 EntityResolver::new(&store)
384 .with_embedding_store(emb)
385 .with_provider(&provider)
386 } else {
387 EntityResolver::new(&store)
388 };
389
390 let (entity_name_to_id, entities_upserted) = upsert_entities(&resolver, &result.entities).await;
391 let edges_inserted = insert_edges(&resolver, &result.edges, &entity_name_to_id, &config).await;
392
393 store.checkpoint_wal().await?;
394
395 let new_entity_ids: Vec<i64> = entity_name_to_id.into_values().collect();
396
397 link_episode(&store, &config, &new_entity_ids).await;
398
399 #[cfg(feature = "profiling")]
400 {
401 let span = tracing::Span::current();
402 span.record("entities", entities_upserted);
403 span.record("edges", edges_inserted);
404 }
405
406 Ok(ExtractionResult {
407 stats: ExtractionStats {
408 entities_upserted,
409 edges_inserted,
410 },
411 entity_ids: new_entity_ids,
412 })
413}
414
415async fn bump_extraction_count(pool: &DbPool) -> Result<(), MemoryError> {
417 zeph_db::query(sql!(
418 "INSERT INTO graph_metadata (key, value) VALUES ('extraction_count', '0')
419 ON CONFLICT(key) DO NOTHING"
420 ))
421 .execute(pool)
422 .await?;
423 zeph_db::query(sql!(
424 "UPDATE graph_metadata
425 SET value = CAST(CAST(value AS INTEGER) + 1 AS TEXT)
426 WHERE key = 'extraction_count'"
427 ))
428 .execute(pool)
429 .await?;
430 Ok(())
431}
432
433async fn upsert_entities(
435 resolver: &crate::graph::EntityResolver<'_>,
436 entities: &[crate::graph::extractor::ExtractedEntity],
437) -> (std::collections::HashMap<String, i64>, usize) {
438 let mut entity_name_to_id: std::collections::HashMap<String, i64> =
439 std::collections::HashMap::new();
440 let mut entities_upserted = 0usize;
441
442 for entity in entities {
443 match resolver
444 .resolve(&entity.name, &entity.entity_type, entity.summary.as_deref())
445 .await
446 {
447 Ok((id, _outcome)) => {
448 entity_name_to_id.insert(entity.name.clone(), id);
449 entities_upserted += 1;
450 }
451 Err(e) => {
452 tracing::debug!("graph: skipping entity {:?}: {e:#}", entity.name);
453 }
454 }
455 }
456
457 (entity_name_to_id, entities_upserted)
458}
459
460async fn insert_edges(
464 resolver: &crate::graph::EntityResolver<'_>,
465 edges: &[crate::graph::extractor::ExtractedEdge],
466 name_to_id: &std::collections::HashMap<String, i64>,
467 config: &GraphExtractionConfig,
468) -> usize {
469 let mut edges_inserted = 0usize;
470 for edge in edges {
471 let (Some(&src_id), Some(&tgt_id)) =
472 (name_to_id.get(&edge.source), name_to_id.get(&edge.target))
473 else {
474 tracing::debug!(
475 "graph: skipping edge {:?}->{:?}: entity not resolved",
476 edge.source,
477 edge.target
478 );
479 continue;
480 };
481 if src_id == tgt_id {
482 tracing::debug!(
483 "graph: skipping self-loop edge {:?}->{:?} (entity_id={src_id})",
484 edge.source,
485 edge.target
486 );
487 continue;
488 }
489 let edge_type = edge
492 .edge_type
493 .parse::<crate::graph::EdgeType>()
494 .unwrap_or_else(|_| {
495 tracing::warn!(
496 raw_type = %edge.edge_type,
497 "graph: unknown edge_type from LLM, defaulting to semantic"
498 );
499 crate::graph::EdgeType::Semantic
500 });
501 if config.apex_mem_enabled {
502 let relation_trimmed = edge.relation.trim();
504 let relation_display_clean = strip_control_chars(relation_trimmed);
505 let relation_display =
506 truncate_to_bytes_ref(&relation_display_clean, MAX_RELATION_BYTES).to_owned();
507 let canonical_clean = strip_control_chars(&relation_trimmed.to_lowercase());
508 let canonical_relation =
509 truncate_to_bytes_ref(&canonical_clean, MAX_RELATION_BYTES).to_owned();
510 let fact_clean = strip_control_chars(edge.fact.trim());
511 let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
512 match resolver
513 .graph_store()
514 .insert_or_supersede(
515 src_id,
516 tgt_id,
517 &relation_display,
518 &canonical_relation,
519 &normalized_fact,
520 0.8,
521 None,
522 edge_type,
523 true,
524 )
525 .await
526 {
527 Ok(_) => edges_inserted += 1,
528 Err(e) => {
529 tracing::debug!("graph: skipping edge (apex): {e:#}");
530 }
531 }
532 } else {
533 let belief_cfg =
534 config
535 .belief_revision_enabled
536 .then_some(crate::graph::BeliefRevisionConfig {
537 similarity_threshold: config.belief_revision_similarity_threshold,
538 });
539 match resolver
540 .resolve_edge_typed(
541 src_id,
542 tgt_id,
543 &edge.relation,
544 &edge.fact,
545 0.8,
546 None,
547 edge_type,
548 belief_cfg.as_ref(),
549 )
550 .await
551 {
552 Ok(Some(_)) => edges_inserted += 1,
553 Ok(None) => {} Err(e) => {
555 tracing::debug!("graph: skipping edge: {e:#}");
556 }
557 }
558 }
559 }
560 edges_inserted
561}
562
563async fn link_episode(
565 store: &crate::graph::GraphStore,
566 config: &GraphExtractionConfig,
567 entity_ids: &[i64],
568) {
569 let Some(conv_id) = config.conversation_id else {
570 return;
571 };
572 match store.ensure_episode(conv_id).await {
573 Ok(episode_id) => {
574 for &entity_id in entity_ids {
575 if let Err(e) = store.link_entity_to_episode(episode_id, entity_id).await {
576 tracing::debug!("episode linking skipped for entity {entity_id}: {e:#}");
577 }
578 }
579 }
580 Err(e) => {
581 tracing::warn!("failed to ensure episode for conversation {conv_id}: {e:#}");
582 }
583 }
584}
585
586impl SemanticMemory {
587 pub fn spawn_graph_extraction(
599 &self,
600 content: String,
601 context_messages: Vec<String>,
602 config: GraphExtractionConfig,
603 post_extract_validator: PostExtractValidator,
604 provider_override: Option<AnyProvider>,
605 ) -> tokio::task::JoinHandle<()> {
606 let using_override = provider_override.is_some();
607 let provider = provider_override.unwrap_or_else(|| self.provider.clone());
608 if using_override {
609 tracing::debug!(
610 extract_provider = provider.name(),
611 "graph extraction using override provider (quality_gate bypassed)"
612 );
613 }
614 let ctx = GraphExtractionTaskCtx {
615 pool: self.sqlite.pool().clone(),
616 provider,
617 failure_counter: self.community_detection_failures.clone(),
618 extraction_count: self.graph_extraction_count.clone(),
619 extraction_failures: self.graph_extraction_failures.clone(),
620 embedding_store: self.qdrant.clone(),
621 };
622
623 tokio::spawn(run_graph_extraction_task(
624 content,
625 context_messages,
626 config,
627 post_extract_validator,
628 ctx,
629 ))
630 }
631}
632
633struct GraphExtractionTaskCtx {
637 pool: DbPool,
638 provider: AnyProvider,
639 failure_counter: Arc<std::sync::atomic::AtomicU64>,
640 extraction_count: Arc<std::sync::atomic::AtomicU64>,
641 extraction_failures: Arc<std::sync::atomic::AtomicU64>,
642 embedding_store: Option<Arc<EmbeddingStore>>,
643}
644
645async fn run_graph_extraction_task(
647 content: String,
648 context_messages: Vec<String>,
649 config: GraphExtractionConfig,
650 post_extract_validator: PostExtractValidator,
651 ctx: GraphExtractionTaskCtx,
652) {
653 let timeout_dur = std::time::Duration::from_secs(config.extraction_timeout_secs);
654 let extraction_result = tokio::time::timeout(
655 timeout_dur,
656 extract_and_store(
657 content,
658 context_messages,
659 ctx.provider.clone(),
660 ctx.pool.clone(),
661 config.clone(),
662 post_extract_validator,
663 ctx.embedding_store.clone(),
664 ),
665 )
666 .await;
667
668 let (extraction_ok, new_entity_ids) = match extraction_result {
669 Ok(Ok(result)) => {
670 tracing::debug!(
671 entities = result.stats.entities_upserted,
672 edges = result.stats.edges_inserted,
673 "graph extraction completed"
674 );
675 ctx.extraction_count.fetch_add(1, Ordering::Relaxed);
676 (true, result.entity_ids)
677 }
678 Ok(Err(e)) => {
679 tracing::warn!("graph extraction failed: {e:#}");
680 ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
681 (false, vec![])
682 }
683 Err(_elapsed) => {
684 tracing::warn!("graph extraction timed out");
685 ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
686 (false, vec![])
687 }
688 };
689
690 run_note_linking(
691 extraction_ok,
692 &new_entity_ids,
693 ctx.pool.clone(),
694 ctx.embedding_store,
695 ctx.provider.clone(),
696 &config,
697 )
698 .await;
699
700 maybe_refresh_communities(
701 extraction_ok,
702 ctx.pool,
703 ctx.provider,
704 ctx.failure_counter,
705 &config,
706 )
707 .await;
708}
709
710async fn run_note_linking(
712 extraction_ok: bool,
713 new_entity_ids: &[i64],
714 pool: DbPool,
715 embedding_store: Option<Arc<EmbeddingStore>>,
716 provider: AnyProvider,
717 config: &GraphExtractionConfig,
718) {
719 if !extraction_ok || !config.note_linking.enabled || new_entity_ids.is_empty() {
720 return;
721 }
722 let Some(store) = embedding_store else {
723 return;
724 };
725 let linking_timeout = std::time::Duration::from_secs(config.note_linking.timeout_secs);
726 match tokio::time::timeout(
727 linking_timeout,
728 link_memory_notes(new_entity_ids, pool, store, provider, &config.note_linking),
729 )
730 .await
731 {
732 Ok(stats) => {
733 tracing::debug!(
734 entities_processed = stats.entities_processed,
735 edges_created = stats.edges_created,
736 "note linking completed"
737 );
738 }
739 Err(_elapsed) => {
740 tracing::debug!("note linking timed out (partial edges may exist)");
741 }
742 }
743}
744
745async fn maybe_refresh_communities(
748 extraction_ok: bool,
749 pool: DbPool,
750 provider: AnyProvider,
751 failure_counter: Arc<std::sync::atomic::AtomicU64>,
752 config: &GraphExtractionConfig,
753) {
754 use crate::graph::GraphStore;
755
756 if !extraction_ok || config.community_refresh_interval == 0 {
757 return;
758 }
759
760 let store = GraphStore::new(pool.clone());
761 let extraction_count = store.extraction_count().await.unwrap_or(0);
762 if extraction_count == 0
763 || !i64::try_from(config.community_refresh_interval)
764 .is_ok_and(|interval| extraction_count % interval == 0)
765 {
766 return;
767 }
768
769 tracing::info!(extraction_count, "triggering community detection refresh");
770 let store2 = GraphStore::new(pool);
771 let provider2 = provider;
772 let retention_days = config.expired_edge_retention_days;
773 let max_cap = config.max_entities_cap;
774 let max_prompt_bytes = config.community_summary_max_prompt_bytes;
775 let concurrency = config.community_summary_concurrency;
776 let edge_chunk_size = config.lpa_edge_chunk_size;
777 let decay_lambda = config.link_weight_decay_lambda;
778 let decay_interval_secs = config.link_weight_decay_interval_secs;
779 tokio::spawn(async move {
780 match crate::graph::community::detect_communities(
781 &store2,
782 &provider2,
783 max_prompt_bytes,
784 concurrency,
785 edge_chunk_size,
786 )
787 .await
788 {
789 Ok(count) => {
790 tracing::info!(communities = count, "community detection complete");
791 }
792 Err(e) => {
793 tracing::warn!("community detection failed: {e:#}");
794 failure_counter.fetch_add(1, Ordering::Relaxed);
795 }
796 }
797 match crate::graph::community::run_graph_eviction(&store2, retention_days, max_cap).await {
798 Ok(stats) => {
799 tracing::info!(
800 expired_edges = stats.expired_edges_deleted,
801 orphan_entities = stats.orphan_entities_deleted,
802 capped_entities = stats.capped_entities_deleted,
803 "graph eviction complete"
804 );
805 }
806 Err(e) => {
807 tracing::warn!("graph eviction failed: {e:#}");
808 }
809 }
810
811 if decay_lambda > 0.0 && decay_interval_secs > 0 {
813 let now_secs = std::time::SystemTime::now()
814 .duration_since(std::time::UNIX_EPOCH)
815 .map_or(0, |d| d.as_secs());
816 let last_decay = store2
817 .get_metadata("last_link_weight_decay_at")
818 .await
819 .ok()
820 .flatten()
821 .and_then(|s| s.parse::<u64>().ok())
822 .unwrap_or(0);
823 if now_secs.saturating_sub(last_decay) >= decay_interval_secs {
824 match store2
825 .decay_edge_retrieval_counts(decay_lambda, decay_interval_secs)
826 .await
827 {
828 Ok(affected) => {
829 tracing::info!(affected, "link weight decay applied");
830 let _ = store2
831 .set_metadata("last_link_weight_decay_at", &now_secs.to_string())
832 .await;
833 }
834 Err(e) => {
835 tracing::warn!("link weight decay failed: {e:#}");
836 }
837 }
838 }
839 }
840 });
841}
842
843#[cfg(test)]
844mod tests {
845 use std::sync::Arc;
846
847 use zeph_llm::any::AnyProvider;
848
849 use super::extract_and_store;
850 use crate::embedding_store::EmbeddingStore;
851 use crate::graph::GraphStore;
852 use crate::in_memory_store::InMemoryVectorStore;
853 use crate::store::SqliteStore;
854
855 use super::GraphExtractionConfig;
856
857 async fn setup() -> (GraphStore, Arc<EmbeddingStore>) {
858 let sqlite = SqliteStore::new(":memory:").await.unwrap();
859 let pool = sqlite.pool().clone();
860 let mem_store = Box::new(InMemoryVectorStore::new());
861 let emb = Arc::new(EmbeddingStore::with_store(mem_store, pool.clone()));
862 let gs = GraphStore::new(pool);
863 (gs, emb)
864 }
865
866 #[tokio::test]
869 async fn extract_and_store_sets_qdrant_point_id_when_embedding_store_provided() {
870 let (gs, emb) = setup().await;
871
872 let extraction_json = r#"{"entities":[{"name":"Rust","type":"language","summary":"systems language"}],"edges":[]}"#;
874 let mut mock =
875 zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
876 mock.supports_embeddings = true;
877 mock.embedding = vec![1.0_f32, 0.0, 0.0, 0.0];
878 let provider = AnyProvider::Mock(mock);
879
880 let config = GraphExtractionConfig {
881 max_entities: 10,
882 max_edges: 10,
883 extraction_timeout_secs: 10,
884 ..Default::default()
885 };
886
887 let result = extract_and_store(
888 "Rust is a systems programming language.".to_owned(),
889 vec![],
890 provider,
891 gs.pool().clone(),
892 config,
893 None,
894 Some(emb.clone()),
895 )
896 .await
897 .unwrap();
898
899 assert_eq!(
900 result.stats.entities_upserted, 1,
901 "one entity should be upserted"
902 );
903
904 let entity = gs
908 .find_entity("rust", crate::graph::EntityType::Language)
909 .await
910 .unwrap()
911 .expect("entity 'rust' must exist in SQLite");
912
913 assert!(
914 entity.qdrant_point_id.is_some(),
915 "qdrant_point_id must be set when embedding_store + provider are both provided (regression for #1829)"
916 );
917 }
918
919 #[tokio::test]
922 async fn extract_and_store_without_embedding_store_still_upserts_entities() {
923 let (gs, _emb) = setup().await;
924
925 let extraction_json = r#"{"entities":[{"name":"Python","type":"language","summary":"scripting"}],"edges":[]}"#;
926 let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
927 let provider = AnyProvider::Mock(mock);
928
929 let config = GraphExtractionConfig {
930 max_entities: 10,
931 max_edges: 10,
932 extraction_timeout_secs: 10,
933 ..Default::default()
934 };
935
936 let result = extract_and_store(
937 "Python is a scripting language.".to_owned(),
938 vec![],
939 provider,
940 gs.pool().clone(),
941 config,
942 None,
943 None, )
945 .await
946 .unwrap();
947
948 assert_eq!(result.stats.entities_upserted, 1);
949
950 let entity = gs
951 .find_entity("python", crate::graph::EntityType::Language)
952 .await
953 .unwrap()
954 .expect("entity 'python' must exist");
955
956 assert!(
957 entity.qdrant_point_id.is_none(),
958 "qdrant_point_id must remain None when no embedding_store is provided"
959 );
960 }
961
962 #[tokio::test]
966 async fn extract_and_store_fts5_cross_session_visibility() {
967 let file = tempfile::NamedTempFile::new().expect("tempfile");
968 let path = file.path().to_str().expect("valid path").to_string();
969
970 {
972 let sqlite = crate::store::SqliteStore::new(&path).await.unwrap();
973 let extraction_json = r#"{"entities":[{"name":"Ferris","type":"concept","summary":"Rust mascot"}],"edges":[]}"#;
974 let mock =
975 zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
976 let provider = AnyProvider::Mock(mock);
977 let config = GraphExtractionConfig {
978 max_entities: 10,
979 max_edges: 10,
980 extraction_timeout_secs: 10,
981 ..Default::default()
982 };
983 extract_and_store(
984 "Ferris is the Rust mascot.".to_owned(),
985 vec![],
986 provider,
987 sqlite.pool().clone(),
988 config,
989 None,
990 None,
991 )
992 .await
993 .unwrap();
994 }
995
996 let sqlite_b = crate::store::SqliteStore::new(&path).await.unwrap();
998 let gs_b = crate::graph::GraphStore::new(sqlite_b.pool().clone());
999 let results = gs_b.find_entities_fuzzy("Ferris", 10).await.unwrap();
1000 assert!(
1001 !results.is_empty(),
1002 "FTS5 cross-session (#2166): entity extracted in session A must be visible in session B"
1003 );
1004 }
1005
1006 #[tokio::test]
1009 async fn extract_and_store_skips_self_loop_edges() {
1010 let (gs, _emb) = setup().await;
1011
1012 let extraction_json = r#"{
1014 "entities":[{"name":"Rust","type":"language","summary":"systems language"}],
1015 "edges":[{"source":"Rust","target":"Rust","relation":"is","fact":"Rust is Rust","edge_type":"semantic"}]
1016 }"#;
1017 let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1018 let provider = AnyProvider::Mock(mock);
1019
1020 let config = GraphExtractionConfig {
1021 max_entities: 10,
1022 max_edges: 10,
1023 extraction_timeout_secs: 10,
1024 ..Default::default()
1025 };
1026
1027 let result = extract_and_store(
1028 "Rust is a language.".to_owned(),
1029 vec![],
1030 provider,
1031 gs.pool().clone(),
1032 config,
1033 None,
1034 None,
1035 )
1036 .await
1037 .unwrap();
1038
1039 assert_eq!(result.stats.entities_upserted, 1);
1040 assert_eq!(
1041 result.stats.edges_inserted, 0,
1042 "self-loop edge must be rejected (#2215)"
1043 );
1044 }
1045
1046 #[tokio::test]
1051 async fn apex_mem_path_inserts_edge_via_insert_or_supersede() {
1052 let (gs, _emb) = setup().await;
1053
1054 let extraction_json = r#"{
1055 "entities":[
1056 {"name":"Alice","type":"person","summary":"a person"},
1057 {"name":"Bob","type":"person","summary":"another person"}
1058 ],
1059 "edges":[
1060 {"source":"Alice","target":"Bob","relation":"KNOWS","fact":"Alice knows Bob","edge_type":"semantic"}
1061 ]
1062 }"#;
1063 let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1064 let provider = AnyProvider::Mock(mock);
1065
1066 let config = GraphExtractionConfig {
1067 max_entities: 10,
1068 max_edges: 10,
1069 extraction_timeout_secs: 10,
1070 apex_mem_enabled: true,
1071 ..Default::default()
1072 };
1073
1074 let result = extract_and_store(
1075 "Alice knows Bob.".to_owned(),
1076 vec![],
1077 provider,
1078 gs.pool().clone(),
1079 config,
1080 None,
1081 None,
1082 )
1083 .await
1084 .unwrap();
1085
1086 assert_eq!(result.stats.entities_upserted, 2, "two entities expected");
1087 assert_eq!(
1088 result.stats.edges_inserted, 1,
1089 "APEX-MEM path must insert the edge and count it (#3631)"
1090 );
1091
1092 let alice_id = gs
1094 .find_entity("alice", crate::graph::EntityType::Person)
1095 .await
1096 .unwrap()
1097 .expect("entity 'alice' must exist")
1098 .id;
1099 let bob_id = gs
1100 .find_entity("bob", crate::graph::EntityType::Person)
1101 .await
1102 .unwrap()
1103 .expect("entity 'bob' must exist")
1104 .id;
1105 let edges = gs.edges_exact(alice_id, bob_id).await.unwrap();
1106 assert_eq!(edges.len(), 1, "exactly one edge expected");
1107 assert_eq!(
1109 edges[0].relation, "KNOWS",
1110 "display relation must preserve original casing"
1111 );
1112 }
1113}