1use std::sync::Arc;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use std::sync::atomic::Ordering;
9use zeph_db::DbPool;
10
11pub use zeph_common::config::memory::NoteLinkingConfig;
12use zeph_llm::any::AnyProvider;
13use zeph_llm::provider::LlmProvider as _;
14
15use crate::embedding_store::EmbeddingStore;
16use crate::error::MemoryError;
17use crate::graph::extractor::ExtractionResult as ExtractorResult;
18use crate::vector_store::VectorFilter;
19
20use super::SemanticMemory;
21
22pub type PostExtractValidator = Option<Box<dyn Fn(&ExtractorResult) -> Result<(), String> + Send>>;
27
28#[derive(Debug, Clone)]
33pub struct GraphExtractionConfig {
34 pub max_entities: usize,
35 pub max_edges: usize,
36 pub extraction_timeout_secs: u64,
37 pub community_refresh_interval: usize,
38 pub expired_edge_retention_days: u32,
39 pub max_entities_cap: usize,
40 pub community_summary_max_prompt_bytes: usize,
41 pub community_summary_concurrency: usize,
42 pub lpa_edge_chunk_size: usize,
43 pub note_linking: NoteLinkingConfig,
45 pub link_weight_decay_lambda: f64,
47 pub link_weight_decay_interval_secs: u64,
49 pub belief_revision_enabled: bool,
51 pub belief_revision_similarity_threshold: f32,
53 pub conversation_id: Option<i64>,
56}
57
58impl Default for GraphExtractionConfig {
59 fn default() -> Self {
60 Self {
61 max_entities: 0,
62 max_edges: 0,
63 extraction_timeout_secs: 0,
64 community_refresh_interval: 0,
65 expired_edge_retention_days: 0,
66 max_entities_cap: 0,
67 community_summary_max_prompt_bytes: 0,
68 community_summary_concurrency: 0,
69 lpa_edge_chunk_size: 0,
70 note_linking: NoteLinkingConfig::default(),
71 link_weight_decay_lambda: 0.95,
72 link_weight_decay_interval_secs: 86400,
73 belief_revision_enabled: false,
74 belief_revision_similarity_threshold: 0.85,
75 conversation_id: None,
76 }
77 }
78}
79
80#[derive(Debug, Default)]
82pub struct ExtractionStats {
83 pub entities_upserted: usize,
84 pub edges_inserted: usize,
85}
86
87#[derive(Debug, Default)]
89pub struct ExtractionResult {
90 pub stats: ExtractionStats,
91 pub entity_ids: Vec<i64>,
93}
94
95#[derive(Debug, Default)]
97pub struct LinkingStats {
98 pub entities_processed: usize,
99 pub edges_created: usize,
100}
101
102const ENTITY_COLLECTION: &str = "zeph_graph_entities";
104
105struct EntityWorkItem {
107 entity_id: i64,
108 canonical_name: String,
109 embed_text: String,
110 self_point_id: Option<String>,
111}
112
113pub async fn link_memory_notes(
129 entity_ids: &[i64],
130 pool: DbPool,
131 embedding_store: Arc<EmbeddingStore>,
132 provider: AnyProvider,
133 cfg: &NoteLinkingConfig,
134) -> LinkingStats {
135 use crate::graph::GraphStore;
136
137 let store = GraphStore::new(pool);
138 let mut stats = LinkingStats::default();
139
140 let work_items = collect_note_link_work_items(entity_ids, &store).await;
141 if work_items.is_empty() {
142 return stats;
143 }
144
145 let valid = embed_work_items(&work_items, &provider).await;
146
147 let search_limit = cfg.top_k + 1; let search_results = search_similar_for_items(&valid, &embedding_store, search_limit).await;
149
150 insert_similarity_edges(
151 &work_items,
152 &valid,
153 &search_results,
154 cfg,
155 &store,
156 &mut stats,
157 )
158 .await;
159
160 stats
161}
162
163async fn collect_note_link_work_items(
167 entity_ids: &[i64],
168 store: &crate::graph::GraphStore,
169) -> Vec<EntityWorkItem> {
170 let mut work_items: Vec<EntityWorkItem> = Vec::with_capacity(entity_ids.len());
171 for &entity_id in entity_ids {
172 let entity = match store.find_entity_by_id(entity_id).await {
173 Ok(Some(e)) => e,
174 Ok(None) => {
175 tracing::debug!("note_linking: entity {entity_id} not found, skipping");
176 continue;
177 }
178 Err(e) => {
179 tracing::debug!("note_linking: DB error loading entity {entity_id}: {e:#}");
180 continue;
181 }
182 };
183 let embed_text = match &entity.summary {
184 Some(s) if !s.is_empty() => format!("{}: {s}", entity.canonical_name),
185 _ => entity.canonical_name.clone(),
186 };
187 work_items.push(EntityWorkItem {
188 entity_id,
189 canonical_name: entity.canonical_name,
190 embed_text,
191 self_point_id: entity.qdrant_point_id,
192 });
193 }
194 work_items
195}
196
197async fn embed_work_items(
202 work_items: &[EntityWorkItem],
203 provider: &AnyProvider,
204) -> Vec<(usize, Vec<f32>)> {
205 use futures::future;
206
207 let embed_results: Vec<_> =
208 future::join_all(work_items.iter().map(|w| provider.embed(&w.embed_text))).await;
209
210 embed_results
211 .into_iter()
212 .enumerate()
213 .filter_map(|(i, r)| match r {
214 Ok(v) => Some((i, v)),
215 Err(e) => {
216 tracing::debug!(
217 "note_linking: embed failed for entity {:?}: {e:#}",
218 work_items[i].canonical_name
219 );
220 None
221 }
222 })
223 .collect()
224}
225
226async fn search_similar_for_items(
228 valid: &[(usize, Vec<f32>)],
229 embedding_store: &EmbeddingStore,
230 search_limit: usize,
231) -> Vec<Result<Vec<crate::ScoredVectorPoint>, MemoryError>> {
232 use futures::future;
233
234 future::join_all(valid.iter().map(|(_, vec)| {
235 embedding_store.search_collection(
236 ENTITY_COLLECTION,
237 vec,
238 search_limit,
239 None::<VectorFilter>,
240 )
241 }))
242 .await
243}
244
245async fn insert_similarity_edges(
250 work_items: &[EntityWorkItem],
251 valid: &[(usize, Vec<f32>)],
252 search_results: &[Result<Vec<crate::ScoredVectorPoint>, MemoryError>],
253 cfg: &NoteLinkingConfig,
254 store: &crate::graph::GraphStore,
255 stats: &mut LinkingStats,
256) {
257 let mut seen_pairs = std::collections::HashSet::new();
258
259 for ((work_idx, _), search_result) in valid.iter().zip(search_results.iter()) {
260 let w = &work_items[*work_idx];
261
262 let results = match search_result {
263 Ok(r) => r,
264 Err(e) => {
265 tracing::debug!(
266 "note_linking: search failed for entity {:?}: {e:#}",
267 w.canonical_name
268 );
269 continue;
270 }
271 };
272
273 stats.entities_processed += 1;
274
275 let self_point_id = w.self_point_id.as_deref();
276 let candidates = results
277 .iter()
278 .filter(|p| Some(p.id.as_str()) != self_point_id && p.score >= cfg.similarity_threshold)
279 .take(cfg.top_k);
280
281 for point in candidates {
282 let Some(target_id) = point
283 .payload
284 .get("entity_id")
285 .and_then(serde_json::Value::as_i64)
286 else {
287 tracing::debug!(
288 "note_linking: missing entity_id in payload for point {}",
289 point.id
290 );
291 continue;
292 };
293
294 if target_id == w.entity_id {
295 continue; }
297
298 let (src, tgt) = if w.entity_id < target_id {
300 (w.entity_id, target_id)
301 } else {
302 (target_id, w.entity_id)
303 };
304
305 if !seen_pairs.insert((src, tgt)) {
306 continue;
307 }
308
309 let fact = format!("Semantically similar entities (score: {:.3})", point.score);
310
311 match store
312 .insert_edge(src, tgt, "similar_to", &fact, point.score, None)
313 .await
314 {
315 Ok(_) => stats.edges_created += 1,
316 Err(e) => {
317 tracing::debug!("note_linking: insert_edge failed: {e:#}");
318 }
319 }
320 }
321 }
322}
323
324#[cfg_attr(
335 feature = "profiling",
336 tracing::instrument(name = "memory.graph_extract", skip_all, fields(entities = tracing::field::Empty, edges = tracing::field::Empty))
337)]
338pub async fn extract_and_store(
339 content: String,
340 context_messages: Vec<String>,
341 provider: AnyProvider,
342 pool: DbPool,
343 config: GraphExtractionConfig,
344 post_extract_validator: PostExtractValidator,
345 embedding_store: Option<Arc<EmbeddingStore>>,
346) -> Result<ExtractionResult, MemoryError> {
347 use crate::graph::{EntityResolver, GraphExtractor, GraphStore};
348
349 let extractor = GraphExtractor::new(provider.clone(), config.max_entities, config.max_edges);
350 let ctx_refs: Vec<&str> = context_messages.iter().map(String::as_str).collect();
351
352 let store = GraphStore::new(pool);
353
354 bump_extraction_count(store.pool()).await?;
355
356 let Some(result) = extractor.extract(&content, &ctx_refs).await? else {
357 return Ok(ExtractionResult::default());
358 };
359
360 if let Some(ref validator) = post_extract_validator
363 && let Err(reason) = validator(&result)
364 {
365 tracing::warn!(
366 reason,
367 "graph extraction validation failed, skipping upsert"
368 );
369 return Ok(ExtractionResult::default());
370 }
371
372 let resolver = if let Some(ref emb) = embedding_store {
373 EntityResolver::new(&store)
374 .with_embedding_store(emb)
375 .with_provider(&provider)
376 } else {
377 EntityResolver::new(&store)
378 };
379
380 let (entity_name_to_id, entities_upserted) = upsert_entities(&resolver, &result.entities).await;
381 let edges_inserted = insert_edges(&resolver, &result.edges, &entity_name_to_id, &config).await;
382
383 store.checkpoint_wal().await?;
384
385 let new_entity_ids: Vec<i64> = entity_name_to_id.into_values().collect();
386
387 link_episode(&store, &config, &new_entity_ids).await;
388
389 #[cfg(feature = "profiling")]
390 {
391 let span = tracing::Span::current();
392 span.record("entities", entities_upserted);
393 span.record("edges", edges_inserted);
394 }
395
396 Ok(ExtractionResult {
397 stats: ExtractionStats {
398 entities_upserted,
399 edges_inserted,
400 },
401 entity_ids: new_entity_ids,
402 })
403}
404
405async fn bump_extraction_count(pool: &DbPool) -> Result<(), MemoryError> {
407 zeph_db::query(sql!(
408 "INSERT INTO graph_metadata (key, value) VALUES ('extraction_count', '0')
409 ON CONFLICT(key) DO NOTHING"
410 ))
411 .execute(pool)
412 .await?;
413 zeph_db::query(sql!(
414 "UPDATE graph_metadata
415 SET value = CAST(CAST(value AS INTEGER) + 1 AS TEXT)
416 WHERE key = 'extraction_count'"
417 ))
418 .execute(pool)
419 .await?;
420 Ok(())
421}
422
423async fn upsert_entities(
425 resolver: &crate::graph::EntityResolver<'_>,
426 entities: &[crate::graph::extractor::ExtractedEntity],
427) -> (std::collections::HashMap<String, i64>, usize) {
428 let mut entity_name_to_id: std::collections::HashMap<String, i64> =
429 std::collections::HashMap::new();
430 let mut entities_upserted = 0usize;
431
432 for entity in entities {
433 match resolver
434 .resolve(&entity.name, &entity.entity_type, entity.summary.as_deref())
435 .await
436 {
437 Ok((id, _outcome)) => {
438 entity_name_to_id.insert(entity.name.clone(), id);
439 entities_upserted += 1;
440 }
441 Err(e) => {
442 tracing::debug!("graph: skipping entity {:?}: {e:#}", entity.name);
443 }
444 }
445 }
446
447 (entity_name_to_id, entities_upserted)
448}
449
450async fn insert_edges(
454 resolver: &crate::graph::EntityResolver<'_>,
455 edges: &[crate::graph::extractor::ExtractedEdge],
456 name_to_id: &std::collections::HashMap<String, i64>,
457 config: &GraphExtractionConfig,
458) -> usize {
459 let mut edges_inserted = 0usize;
460 for edge in edges {
461 let (Some(&src_id), Some(&tgt_id)) =
462 (name_to_id.get(&edge.source), name_to_id.get(&edge.target))
463 else {
464 tracing::debug!(
465 "graph: skipping edge {:?}->{:?}: entity not resolved",
466 edge.source,
467 edge.target
468 );
469 continue;
470 };
471 if src_id == tgt_id {
472 tracing::debug!(
473 "graph: skipping self-loop edge {:?}->{:?} (entity_id={src_id})",
474 edge.source,
475 edge.target
476 );
477 continue;
478 }
479 let edge_type = edge
482 .edge_type
483 .parse::<crate::graph::EdgeType>()
484 .unwrap_or_else(|_| {
485 tracing::warn!(
486 raw_type = %edge.edge_type,
487 "graph: unknown edge_type from LLM, defaulting to semantic"
488 );
489 crate::graph::EdgeType::Semantic
490 });
491 let belief_cfg =
492 config
493 .belief_revision_enabled
494 .then_some(crate::graph::BeliefRevisionConfig {
495 similarity_threshold: config.belief_revision_similarity_threshold,
496 });
497 match resolver
498 .resolve_edge_typed(
499 src_id,
500 tgt_id,
501 &edge.relation,
502 &edge.fact,
503 0.8,
504 None,
505 edge_type,
506 belief_cfg.as_ref(),
507 )
508 .await
509 {
510 Ok(Some(_)) => edges_inserted += 1,
511 Ok(None) => {} Err(e) => {
513 tracing::debug!("graph: skipping edge: {e:#}");
514 }
515 }
516 }
517 edges_inserted
518}
519
520async fn link_episode(
522 store: &crate::graph::GraphStore,
523 config: &GraphExtractionConfig,
524 entity_ids: &[i64],
525) {
526 let Some(conv_id) = config.conversation_id else {
527 return;
528 };
529 match store.ensure_episode(conv_id).await {
530 Ok(episode_id) => {
531 for &entity_id in entity_ids {
532 if let Err(e) = store.link_entity_to_episode(episode_id, entity_id).await {
533 tracing::debug!("episode linking skipped for entity {entity_id}: {e:#}");
534 }
535 }
536 }
537 Err(e) => {
538 tracing::warn!("failed to ensure episode for conversation {conv_id}: {e:#}");
539 }
540 }
541}
542
543impl SemanticMemory {
544 pub fn spawn_graph_extraction(
556 &self,
557 content: String,
558 context_messages: Vec<String>,
559 config: GraphExtractionConfig,
560 post_extract_validator: PostExtractValidator,
561 ) -> tokio::task::JoinHandle<()> {
562 let ctx = GraphExtractionTaskCtx {
563 pool: self.sqlite.pool().clone(),
564 provider: self.provider.clone(),
565 failure_counter: self.community_detection_failures.clone(),
566 extraction_count: self.graph_extraction_count.clone(),
567 extraction_failures: self.graph_extraction_failures.clone(),
568 embedding_store: self.qdrant.clone(),
569 };
570
571 tokio::spawn(run_graph_extraction_task(
572 content,
573 context_messages,
574 config,
575 post_extract_validator,
576 ctx,
577 ))
578 }
579}
580
581struct GraphExtractionTaskCtx {
585 pool: DbPool,
586 provider: AnyProvider,
587 failure_counter: Arc<std::sync::atomic::AtomicU64>,
588 extraction_count: Arc<std::sync::atomic::AtomicU64>,
589 extraction_failures: Arc<std::sync::atomic::AtomicU64>,
590 embedding_store: Option<Arc<EmbeddingStore>>,
591}
592
593async fn run_graph_extraction_task(
595 content: String,
596 context_messages: Vec<String>,
597 config: GraphExtractionConfig,
598 post_extract_validator: PostExtractValidator,
599 ctx: GraphExtractionTaskCtx,
600) {
601 let timeout_dur = std::time::Duration::from_secs(config.extraction_timeout_secs);
602 let extraction_result = tokio::time::timeout(
603 timeout_dur,
604 extract_and_store(
605 content,
606 context_messages,
607 ctx.provider.clone(),
608 ctx.pool.clone(),
609 config.clone(),
610 post_extract_validator,
611 ctx.embedding_store.clone(),
612 ),
613 )
614 .await;
615
616 let (extraction_ok, new_entity_ids) = match extraction_result {
617 Ok(Ok(result)) => {
618 tracing::debug!(
619 entities = result.stats.entities_upserted,
620 edges = result.stats.edges_inserted,
621 "graph extraction completed"
622 );
623 ctx.extraction_count.fetch_add(1, Ordering::Relaxed);
624 (true, result.entity_ids)
625 }
626 Ok(Err(e)) => {
627 tracing::warn!("graph extraction failed: {e:#}");
628 ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
629 (false, vec![])
630 }
631 Err(_elapsed) => {
632 tracing::warn!("graph extraction timed out");
633 ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
634 (false, vec![])
635 }
636 };
637
638 run_note_linking(
639 extraction_ok,
640 &new_entity_ids,
641 ctx.pool.clone(),
642 ctx.embedding_store,
643 ctx.provider.clone(),
644 &config,
645 )
646 .await;
647
648 maybe_refresh_communities(
649 extraction_ok,
650 ctx.pool,
651 ctx.provider,
652 ctx.failure_counter,
653 &config,
654 )
655 .await;
656}
657
658async fn run_note_linking(
660 extraction_ok: bool,
661 new_entity_ids: &[i64],
662 pool: DbPool,
663 embedding_store: Option<Arc<EmbeddingStore>>,
664 provider: AnyProvider,
665 config: &GraphExtractionConfig,
666) {
667 if !extraction_ok || !config.note_linking.enabled || new_entity_ids.is_empty() {
668 return;
669 }
670 let Some(store) = embedding_store else {
671 return;
672 };
673 let linking_timeout = std::time::Duration::from_secs(config.note_linking.timeout_secs);
674 match tokio::time::timeout(
675 linking_timeout,
676 link_memory_notes(new_entity_ids, pool, store, provider, &config.note_linking),
677 )
678 .await
679 {
680 Ok(stats) => {
681 tracing::debug!(
682 entities_processed = stats.entities_processed,
683 edges_created = stats.edges_created,
684 "note linking completed"
685 );
686 }
687 Err(_elapsed) => {
688 tracing::debug!("note linking timed out (partial edges may exist)");
689 }
690 }
691}
692
693async fn maybe_refresh_communities(
696 extraction_ok: bool,
697 pool: DbPool,
698 provider: AnyProvider,
699 failure_counter: Arc<std::sync::atomic::AtomicU64>,
700 config: &GraphExtractionConfig,
701) {
702 use crate::graph::GraphStore;
703
704 if !extraction_ok || config.community_refresh_interval == 0 {
705 return;
706 }
707
708 let store = GraphStore::new(pool.clone());
709 let extraction_count = store.extraction_count().await.unwrap_or(0);
710 if extraction_count == 0
711 || !i64::try_from(config.community_refresh_interval)
712 .is_ok_and(|interval| extraction_count % interval == 0)
713 {
714 return;
715 }
716
717 tracing::info!(extraction_count, "triggering community detection refresh");
718 let store2 = GraphStore::new(pool);
719 let provider2 = provider;
720 let retention_days = config.expired_edge_retention_days;
721 let max_cap = config.max_entities_cap;
722 let max_prompt_bytes = config.community_summary_max_prompt_bytes;
723 let concurrency = config.community_summary_concurrency;
724 let edge_chunk_size = config.lpa_edge_chunk_size;
725 let decay_lambda = config.link_weight_decay_lambda;
726 let decay_interval_secs = config.link_weight_decay_interval_secs;
727 tokio::spawn(async move {
728 match crate::graph::community::detect_communities(
729 &store2,
730 &provider2,
731 max_prompt_bytes,
732 concurrency,
733 edge_chunk_size,
734 )
735 .await
736 {
737 Ok(count) => {
738 tracing::info!(communities = count, "community detection complete");
739 }
740 Err(e) => {
741 tracing::warn!("community detection failed: {e:#}");
742 failure_counter.fetch_add(1, Ordering::Relaxed);
743 }
744 }
745 match crate::graph::community::run_graph_eviction(&store2, retention_days, max_cap).await {
746 Ok(stats) => {
747 tracing::info!(
748 expired_edges = stats.expired_edges_deleted,
749 orphan_entities = stats.orphan_entities_deleted,
750 capped_entities = stats.capped_entities_deleted,
751 "graph eviction complete"
752 );
753 }
754 Err(e) => {
755 tracing::warn!("graph eviction failed: {e:#}");
756 }
757 }
758
759 if decay_lambda > 0.0 && decay_interval_secs > 0 {
761 let now_secs = std::time::SystemTime::now()
762 .duration_since(std::time::UNIX_EPOCH)
763 .map_or(0, |d| d.as_secs());
764 let last_decay = store2
765 .get_metadata("last_link_weight_decay_at")
766 .await
767 .ok()
768 .flatten()
769 .and_then(|s| s.parse::<u64>().ok())
770 .unwrap_or(0);
771 if now_secs.saturating_sub(last_decay) >= decay_interval_secs {
772 match store2
773 .decay_edge_retrieval_counts(decay_lambda, decay_interval_secs)
774 .await
775 {
776 Ok(affected) => {
777 tracing::info!(affected, "link weight decay applied");
778 let _ = store2
779 .set_metadata("last_link_weight_decay_at", &now_secs.to_string())
780 .await;
781 }
782 Err(e) => {
783 tracing::warn!("link weight decay failed: {e:#}");
784 }
785 }
786 }
787 }
788 });
789}
790
791#[cfg(test)]
792mod tests {
793 use std::sync::Arc;
794
795 use zeph_llm::any::AnyProvider;
796
797 use super::extract_and_store;
798 use crate::embedding_store::EmbeddingStore;
799 use crate::graph::GraphStore;
800 use crate::in_memory_store::InMemoryVectorStore;
801 use crate::store::SqliteStore;
802
803 use super::GraphExtractionConfig;
804
805 async fn setup() -> (GraphStore, Arc<EmbeddingStore>) {
806 let sqlite = SqliteStore::new(":memory:").await.unwrap();
807 let pool = sqlite.pool().clone();
808 let mem_store = Box::new(InMemoryVectorStore::new());
809 let emb = Arc::new(EmbeddingStore::with_store(mem_store, pool.clone()));
810 let gs = GraphStore::new(pool);
811 (gs, emb)
812 }
813
814 #[tokio::test]
817 async fn extract_and_store_sets_qdrant_point_id_when_embedding_store_provided() {
818 let (gs, emb) = setup().await;
819
820 let extraction_json = r#"{"entities":[{"name":"Rust","type":"language","summary":"systems language"}],"edges":[]}"#;
822 let mut mock =
823 zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
824 mock.supports_embeddings = true;
825 mock.embedding = vec![1.0_f32, 0.0, 0.0, 0.0];
826 let provider = AnyProvider::Mock(mock);
827
828 let config = GraphExtractionConfig {
829 max_entities: 10,
830 max_edges: 10,
831 extraction_timeout_secs: 10,
832 ..Default::default()
833 };
834
835 let result = extract_and_store(
836 "Rust is a systems programming language.".to_owned(),
837 vec![],
838 provider,
839 gs.pool().clone(),
840 config,
841 None,
842 Some(emb.clone()),
843 )
844 .await
845 .unwrap();
846
847 assert_eq!(
848 result.stats.entities_upserted, 1,
849 "one entity should be upserted"
850 );
851
852 let entity = gs
856 .find_entity("rust", crate::graph::EntityType::Language)
857 .await
858 .unwrap()
859 .expect("entity 'rust' must exist in SQLite");
860
861 assert!(
862 entity.qdrant_point_id.is_some(),
863 "qdrant_point_id must be set when embedding_store + provider are both provided (regression for #1829)"
864 );
865 }
866
867 #[tokio::test]
870 async fn extract_and_store_without_embedding_store_still_upserts_entities() {
871 let (gs, _emb) = setup().await;
872
873 let extraction_json = r#"{"entities":[{"name":"Python","type":"language","summary":"scripting"}],"edges":[]}"#;
874 let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
875 let provider = AnyProvider::Mock(mock);
876
877 let config = GraphExtractionConfig {
878 max_entities: 10,
879 max_edges: 10,
880 extraction_timeout_secs: 10,
881 ..Default::default()
882 };
883
884 let result = extract_and_store(
885 "Python is a scripting language.".to_owned(),
886 vec![],
887 provider,
888 gs.pool().clone(),
889 config,
890 None,
891 None, )
893 .await
894 .unwrap();
895
896 assert_eq!(result.stats.entities_upserted, 1);
897
898 let entity = gs
899 .find_entity("python", crate::graph::EntityType::Language)
900 .await
901 .unwrap()
902 .expect("entity 'python' must exist");
903
904 assert!(
905 entity.qdrant_point_id.is_none(),
906 "qdrant_point_id must remain None when no embedding_store is provided"
907 );
908 }
909
910 #[tokio::test]
914 async fn extract_and_store_fts5_cross_session_visibility() {
915 let file = tempfile::NamedTempFile::new().expect("tempfile");
916 let path = file.path().to_str().expect("valid path").to_string();
917
918 {
920 let sqlite = crate::store::SqliteStore::new(&path).await.unwrap();
921 let extraction_json = r#"{"entities":[{"name":"Ferris","type":"concept","summary":"Rust mascot"}],"edges":[]}"#;
922 let mock =
923 zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
924 let provider = AnyProvider::Mock(mock);
925 let config = GraphExtractionConfig {
926 max_entities: 10,
927 max_edges: 10,
928 extraction_timeout_secs: 10,
929 ..Default::default()
930 };
931 extract_and_store(
932 "Ferris is the Rust mascot.".to_owned(),
933 vec![],
934 provider,
935 sqlite.pool().clone(),
936 config,
937 None,
938 None,
939 )
940 .await
941 .unwrap();
942 }
943
944 let sqlite_b = crate::store::SqliteStore::new(&path).await.unwrap();
946 let gs_b = crate::graph::GraphStore::new(sqlite_b.pool().clone());
947 let results = gs_b.find_entities_fuzzy("Ferris", 10).await.unwrap();
948 assert!(
949 !results.is_empty(),
950 "FTS5 cross-session (#2166): entity extracted in session A must be visible in session B"
951 );
952 }
953
954 #[tokio::test]
957 async fn extract_and_store_skips_self_loop_edges() {
958 let (gs, _emb) = setup().await;
959
960 let extraction_json = r#"{
962 "entities":[{"name":"Rust","type":"language","summary":"systems language"}],
963 "edges":[{"source":"Rust","target":"Rust","relation":"is","fact":"Rust is Rust","edge_type":"semantic"}]
964 }"#;
965 let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
966 let provider = AnyProvider::Mock(mock);
967
968 let config = GraphExtractionConfig {
969 max_entities: 10,
970 max_edges: 10,
971 extraction_timeout_secs: 10,
972 ..Default::default()
973 };
974
975 let result = extract_and_store(
976 "Rust is a language.".to_owned(),
977 vec![],
978 provider,
979 gs.pool().clone(),
980 config,
981 None,
982 None,
983 )
984 .await
985 .unwrap();
986
987 assert_eq!(result.stats.entities_upserted, 1);
988 assert_eq!(
989 result.stats.edges_inserted, 0,
990 "self-loop edge must be rejected (#2215)"
991 );
992 }
993}