1use std::sync::Arc;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use std::sync::atomic::Ordering;
9use zeph_db::DbPool;
10
11pub use zeph_common::config::memory::NoteLinkingConfig;
12use zeph_common::sanitize::strip_control_chars;
13use zeph_common::text::truncate_to_bytes_ref;
14use zeph_llm::any::AnyProvider;
15use zeph_llm::provider::LlmProvider as _;
16
17use crate::embedding_store::EmbeddingStore;
18use crate::error::MemoryError;
19use crate::graph::extractor::ExtractionResult as ExtractorResult;
20use crate::vector_store::VectorFilter;
21
22use super::SemanticMemory;
23
24pub type PostExtractValidator = Option<Box<dyn Fn(&ExtractorResult) -> Result<(), String> + Send>>;
29
30#[derive(Debug, Clone)]
35pub struct GraphExtractionConfig {
36 pub max_entities: usize,
37 pub max_edges: usize,
38 pub extraction_timeout_secs: u64,
39 pub community_refresh_interval: usize,
40 pub expired_edge_retention_days: u32,
41 pub max_entities_cap: usize,
42 pub community_summary_max_prompt_bytes: usize,
43 pub community_summary_concurrency: usize,
44 pub lpa_edge_chunk_size: usize,
45 pub note_linking: NoteLinkingConfig,
47 pub link_weight_decay_lambda: f64,
49 pub link_weight_decay_interval_secs: u64,
51 pub belief_revision_enabled: bool,
53 pub belief_revision_similarity_threshold: f32,
55 pub conversation_id: Option<i64>,
58 pub apex_mem_enabled: bool,
60 pub llm_timeout_secs: u64,
62}
63
64impl Default for GraphExtractionConfig {
65 fn default() -> Self {
66 Self {
67 max_entities: 0,
68 max_edges: 0,
69 extraction_timeout_secs: 0,
70 community_refresh_interval: 0,
71 expired_edge_retention_days: 0,
72 max_entities_cap: 0,
73 community_summary_max_prompt_bytes: 0,
74 community_summary_concurrency: 0,
75 lpa_edge_chunk_size: 0,
76 note_linking: NoteLinkingConfig::default(),
77 link_weight_decay_lambda: 0.95,
78 link_weight_decay_interval_secs: 86400,
79 belief_revision_enabled: false,
80 belief_revision_similarity_threshold: 0.85,
81 conversation_id: None,
82 apex_mem_enabled: false,
83 llm_timeout_secs: 30,
84 }
85 }
86}
87
88#[derive(Debug, Default)]
90pub struct ExtractionStats {
91 pub entities_upserted: usize,
92 pub edges_inserted: usize,
93}
94
95#[derive(Debug, Default)]
97pub struct ExtractionResult {
98 pub stats: ExtractionStats,
99 pub entity_ids: Vec<i64>,
101}
102
103#[derive(Debug, Default)]
105pub struct LinkingStats {
106 pub entities_processed: usize,
107 pub edges_created: usize,
108}
109
110const ENTITY_COLLECTION: &str = "zeph_graph_entities";
112
113const MAX_RELATION_BYTES: usize = 256;
115const MAX_FACT_BYTES: usize = 2048;
117
118struct EntityWorkItem {
120 entity_id: i64,
121 canonical_name: String,
122 embed_text: String,
123 self_point_id: Option<String>,
124}
125
126pub async fn link_memory_notes(
142 entity_ids: &[i64],
143 pool: DbPool,
144 embedding_store: Arc<EmbeddingStore>,
145 provider: AnyProvider,
146 cfg: &NoteLinkingConfig,
147) -> LinkingStats {
148 use crate::graph::GraphStore;
149
150 let store = GraphStore::new(pool);
151 let mut stats = LinkingStats::default();
152
153 let work_items = collect_note_link_work_items(entity_ids, &store).await;
154 if work_items.is_empty() {
155 return stats;
156 }
157
158 let valid = embed_work_items(&work_items, &provider, cfg).await;
159
160 let search_limit = cfg.top_k + 1; let search_results = search_similar_for_items(&valid, &embedding_store, search_limit).await;
162
163 insert_similarity_edges(
164 &work_items,
165 &valid,
166 &search_results,
167 cfg,
168 &store,
169 &mut stats,
170 )
171 .await;
172
173 stats
174}
175
176async fn collect_note_link_work_items(
180 entity_ids: &[i64],
181 store: &crate::graph::GraphStore,
182) -> Vec<EntityWorkItem> {
183 let mut work_items: Vec<EntityWorkItem> = Vec::with_capacity(entity_ids.len());
184 for &entity_id in entity_ids {
185 let entity = match store.find_entity_by_id(entity_id).await {
186 Ok(Some(e)) => e,
187 Ok(None) => {
188 tracing::debug!("note_linking: entity {entity_id} not found, skipping");
189 continue;
190 }
191 Err(e) => {
192 tracing::debug!("note_linking: DB error loading entity {entity_id}: {e:#}");
193 continue;
194 }
195 };
196 let embed_text = match &entity.summary {
197 Some(s) if !s.is_empty() => format!("{}: {s}", entity.canonical_name),
198 _ => entity.canonical_name.clone(),
199 };
200 work_items.push(EntityWorkItem {
201 entity_id,
202 canonical_name: entity.canonical_name,
203 embed_text,
204 self_point_id: entity.qdrant_point_id,
205 });
206 }
207 work_items
208}
209
210async fn embed_work_items(
215 work_items: &[EntityWorkItem],
216 provider: &AnyProvider,
217 cfg: &NoteLinkingConfig,
218) -> Vec<(usize, Vec<f32>)> {
219 use futures::future;
220
221 let Ok(embed_results) = tokio::time::timeout(
222 std::time::Duration::from_secs(cfg.timeout_secs),
223 future::join_all(work_items.iter().map(|w| provider.embed(&w.embed_text))),
224 )
225 .await
226 else {
227 tracing::warn!(
228 count = work_items.len(),
229 "note_linking: batch embed timed out — skipping all entities"
230 );
231 return Vec::new();
232 };
233 embed_results
234 .into_iter()
235 .enumerate()
236 .filter_map(|(i, r)| match r {
237 Ok(v) => Some((i, v)),
238 Err(e) => {
239 tracing::debug!(
240 "note_linking: embed failed for entity {:?}: {e:#}",
241 work_items[i].canonical_name
242 );
243 None
244 }
245 })
246 .collect()
247}
248
249async fn search_similar_for_items(
251 valid: &[(usize, Vec<f32>)],
252 embedding_store: &EmbeddingStore,
253 search_limit: usize,
254) -> Vec<Result<Vec<crate::ScoredVectorPoint>, MemoryError>> {
255 use futures::future;
256
257 future::join_all(valid.iter().map(|(_, vec)| {
258 embedding_store.search_collection(
259 ENTITY_COLLECTION,
260 vec,
261 search_limit,
262 None::<VectorFilter>,
263 )
264 }))
265 .await
266}
267
268async fn insert_similarity_edges(
273 work_items: &[EntityWorkItem],
274 valid: &[(usize, Vec<f32>)],
275 search_results: &[Result<Vec<crate::ScoredVectorPoint>, MemoryError>],
276 cfg: &NoteLinkingConfig,
277 store: &crate::graph::GraphStore,
278 stats: &mut LinkingStats,
279) {
280 let mut seen_pairs = std::collections::HashSet::new();
281
282 for ((work_idx, _), search_result) in valid.iter().zip(search_results.iter()) {
283 let w = &work_items[*work_idx];
284
285 let results = match search_result {
286 Ok(r) => r,
287 Err(e) => {
288 tracing::debug!(
289 "note_linking: search failed for entity {:?}: {e:#}",
290 w.canonical_name
291 );
292 continue;
293 }
294 };
295
296 stats.entities_processed += 1;
297
298 let self_point_id = w.self_point_id.as_deref();
299 let candidates = results
300 .iter()
301 .filter(|p| Some(p.id.as_str()) != self_point_id && p.score >= cfg.similarity_threshold)
302 .take(cfg.top_k);
303
304 for point in candidates {
305 let Some(target_id) = point
306 .payload
307 .get("entity_id")
308 .and_then(serde_json::Value::as_i64)
309 else {
310 tracing::debug!(
311 "note_linking: missing entity_id in payload for point {}",
312 point.id
313 );
314 continue;
315 };
316
317 if target_id == w.entity_id {
318 continue; }
320
321 let (src, tgt) = if w.entity_id < target_id {
323 (w.entity_id, target_id)
324 } else {
325 (target_id, w.entity_id)
326 };
327
328 if !seen_pairs.insert((src, tgt)) {
329 continue;
330 }
331
332 let fact = format!("Semantically similar entities (score: {:.3})", point.score);
333
334 match store
335 .insert_edge(src, tgt, "similar_to", &fact, point.score, None)
336 .await
337 {
338 Ok(_) => stats.edges_created += 1,
339 Err(e) => {
340 tracing::debug!("note_linking: insert_edge failed: {e:#}");
341 }
342 }
343 }
344 }
345}
346
347#[cfg_attr(
358 feature = "profiling",
359 tracing::instrument(name = "memory.graph_extract", skip_all, fields(entities = tracing::field::Empty, edges = tracing::field::Empty))
360)]
361pub async fn extract_and_store(
362 content: String,
363 context_messages: Vec<String>,
364 provider: AnyProvider,
365 pool: DbPool,
366 config: GraphExtractionConfig,
367 post_extract_validator: PostExtractValidator,
368 embedding_store: Option<Arc<EmbeddingStore>>,
369) -> Result<ExtractionResult, MemoryError> {
370 use crate::graph::{EntityResolver, GraphExtractor, GraphStore};
371
372 let extractor = GraphExtractor::new(
373 provider.clone(),
374 config.max_entities,
375 config.max_edges,
376 config.llm_timeout_secs,
377 );
378 let ctx_refs: Vec<&str> = context_messages.iter().map(String::as_str).collect();
379
380 let store = GraphStore::new(pool);
381
382 bump_extraction_count(store.pool()).await?;
383
384 let Some(result) = extractor.extract(&content, &ctx_refs).await? else {
385 return Ok(ExtractionResult::default());
386 };
387
388 if let Some(ref validator) = post_extract_validator
391 && let Err(reason) = validator(&result)
392 {
393 tracing::warn!(
394 reason,
395 "graph extraction validation failed, skipping upsert"
396 );
397 return Ok(ExtractionResult::default());
398 }
399
400 let resolver = if let Some(ref emb) = embedding_store {
401 EntityResolver::new(&store)
402 .with_embedding_store(emb)
403 .with_provider(&provider)
404 } else {
405 EntityResolver::new(&store)
406 };
407
408 let (entity_name_to_id, entities_upserted) = upsert_entities(&resolver, &result.entities).await;
409 let edges_inserted = insert_edges(&resolver, &result.edges, &entity_name_to_id, &config).await;
410
411 #[cfg(any(feature = "sqlite", feature = "postgres"))]
412 store.checkpoint_wal().await?;
413
414 let new_entity_ids: Vec<i64> = entity_name_to_id.into_values().collect();
415
416 link_episode(&store, &config, &new_entity_ids).await;
417
418 #[cfg(feature = "profiling")]
419 {
420 let span = tracing::Span::current();
421 span.record("entities", entities_upserted);
422 span.record("edges", edges_inserted);
423 }
424
425 Ok(ExtractionResult {
426 stats: ExtractionStats {
427 entities_upserted,
428 edges_inserted,
429 },
430 entity_ids: new_entity_ids,
431 })
432}
433
434async fn bump_extraction_count(pool: &DbPool) -> Result<(), MemoryError> {
436 zeph_db::query(sql!(
437 "INSERT INTO graph_metadata (key, value) VALUES ('extraction_count', '0')
438 ON CONFLICT(key) DO NOTHING"
439 ))
440 .execute(pool)
441 .await?;
442 zeph_db::query(sql!(
443 "UPDATE graph_metadata
444 SET value = CAST(CAST(value AS INTEGER) + 1 AS TEXT)
445 WHERE key = 'extraction_count'"
446 ))
447 .execute(pool)
448 .await?;
449 Ok(())
450}
451
452async fn upsert_entities(
454 resolver: &crate::graph::EntityResolver<'_>,
455 entities: &[crate::graph::extractor::ExtractedEntity],
456) -> (std::collections::HashMap<String, i64>, usize) {
457 let mut entity_name_to_id: std::collections::HashMap<String, i64> =
458 std::collections::HashMap::new();
459 let mut entities_upserted = 0usize;
460
461 for entity in entities {
462 match resolver
463 .resolve(&entity.name, &entity.entity_type, entity.summary.as_deref())
464 .await
465 {
466 Ok((id, _outcome)) => {
467 entity_name_to_id.insert(entity.name.clone(), id);
468 entities_upserted += 1;
469 }
470 Err(e) => {
471 tracing::debug!("graph: skipping entity {:?}: {e:#}", entity.name);
472 }
473 }
474 }
475
476 (entity_name_to_id, entities_upserted)
477}
478
479async fn insert_edges(
483 resolver: &crate::graph::EntityResolver<'_>,
484 edges: &[crate::graph::extractor::ExtractedEdge],
485 name_to_id: &std::collections::HashMap<String, i64>,
486 config: &GraphExtractionConfig,
487) -> usize {
488 let mut edges_inserted = 0usize;
489 for edge in edges {
490 let (Some(&src_id), Some(&tgt_id)) =
491 (name_to_id.get(&edge.source), name_to_id.get(&edge.target))
492 else {
493 tracing::debug!(
494 "graph: skipping edge {:?}->{:?}: entity not resolved",
495 edge.source,
496 edge.target
497 );
498 continue;
499 };
500 if src_id == tgt_id {
501 tracing::debug!(
502 "graph: skipping self-loop edge {:?}->{:?} (entity_id={src_id})",
503 edge.source,
504 edge.target
505 );
506 continue;
507 }
508 let edge_type = edge
511 .edge_type
512 .parse::<crate::graph::EdgeType>()
513 .unwrap_or_else(|_| {
514 tracing::warn!(
515 raw_type = %edge.edge_type,
516 "graph: unknown edge_type from LLM, defaulting to semantic"
517 );
518 crate::graph::EdgeType::Semantic
519 });
520 if config.apex_mem_enabled {
521 let relation_trimmed = edge.relation.trim();
523 let relation_display_clean = strip_control_chars(relation_trimmed);
524 let relation_display =
525 truncate_to_bytes_ref(&relation_display_clean, MAX_RELATION_BYTES).to_owned();
526 let canonical_clean = strip_control_chars(&relation_trimmed.to_lowercase());
527 let canonical_relation =
528 truncate_to_bytes_ref(&canonical_clean, MAX_RELATION_BYTES).to_owned();
529 let fact_clean = strip_control_chars(edge.fact.trim());
530 let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
531 match resolver
532 .graph_store()
533 .insert_or_supersede(
534 src_id,
535 tgt_id,
536 &relation_display,
537 &canonical_relation,
538 &normalized_fact,
539 0.8,
540 None,
541 edge_type,
542 true,
543 )
544 .await
545 {
546 Ok(_) => edges_inserted += 1,
547 Err(e) => {
548 tracing::debug!("graph: skipping edge (apex): {e:#}");
549 }
550 }
551 } else {
552 let belief_cfg =
553 config
554 .belief_revision_enabled
555 .then_some(crate::graph::BeliefRevisionConfig {
556 similarity_threshold: config.belief_revision_similarity_threshold,
557 });
558 match resolver
559 .resolve_edge_typed(
560 src_id,
561 tgt_id,
562 &edge.relation,
563 &edge.fact,
564 0.8,
565 None,
566 edge_type,
567 belief_cfg.as_ref(),
568 )
569 .await
570 {
571 Ok(Some(_)) => edges_inserted += 1,
572 Ok(None) => {} Err(e) => {
574 tracing::debug!("graph: skipping edge: {e:#}");
575 }
576 }
577 }
578 }
579 edges_inserted
580}
581
582async fn link_episode(
584 store: &crate::graph::GraphStore,
585 config: &GraphExtractionConfig,
586 entity_ids: &[i64],
587) {
588 let Some(conv_id) = config.conversation_id else {
589 return;
590 };
591 match store.ensure_episode(conv_id).await {
592 Ok(episode_id) => {
593 for &entity_id in entity_ids {
594 if let Err(e) = store.link_entity_to_episode(episode_id, entity_id).await {
595 tracing::debug!("episode linking skipped for entity {entity_id}: {e:#}");
596 }
597 }
598 }
599 Err(e) => {
600 tracing::warn!("failed to ensure episode for conversation {conv_id}: {e:#}");
601 }
602 }
603}
604
605impl SemanticMemory {
606 pub fn spawn_graph_extraction(
618 &self,
619 content: String,
620 context_messages: Vec<String>,
621 config: GraphExtractionConfig,
622 post_extract_validator: PostExtractValidator,
623 provider_override: Option<AnyProvider>,
624 ) -> tokio::task::JoinHandle<()> {
625 let using_override = provider_override.is_some();
626 let provider = provider_override.unwrap_or_else(|| self.provider.clone());
627 if using_override {
628 tracing::debug!(
629 extract_provider = provider.name(),
630 "graph extraction using override provider (quality_gate bypassed)"
631 );
632 }
633 let ctx = GraphExtractionTaskCtx {
634 pool: self.sqlite.pool().clone(),
635 provider,
636 failure_counter: self.community_detection_failures.clone(),
637 extraction_count: self.graph_extraction_count.clone(),
638 extraction_failures: self.graph_extraction_failures.clone(),
639 embedding_store: self.qdrant.clone(),
640 };
641
642 tokio::spawn(run_graph_extraction_task(
643 content,
644 context_messages,
645 config,
646 post_extract_validator,
647 ctx,
648 ))
649 }
650}
651
652struct GraphExtractionTaskCtx {
656 pool: DbPool,
657 provider: AnyProvider,
658 failure_counter: Arc<std::sync::atomic::AtomicU64>,
659 extraction_count: Arc<std::sync::atomic::AtomicU64>,
660 extraction_failures: Arc<std::sync::atomic::AtomicU64>,
661 embedding_store: Option<Arc<EmbeddingStore>>,
662}
663
664async fn run_graph_extraction_task(
666 content: String,
667 context_messages: Vec<String>,
668 config: GraphExtractionConfig,
669 post_extract_validator: PostExtractValidator,
670 ctx: GraphExtractionTaskCtx,
671) {
672 let timeout_dur = std::time::Duration::from_secs(config.extraction_timeout_secs);
673 let extraction_result = tokio::time::timeout(
674 timeout_dur,
675 extract_and_store(
676 content,
677 context_messages,
678 ctx.provider.clone(),
679 ctx.pool.clone(),
680 config.clone(),
681 post_extract_validator,
682 ctx.embedding_store.clone(),
683 ),
684 )
685 .await;
686
687 let (extraction_ok, new_entity_ids) = match extraction_result {
688 Ok(Ok(result)) => {
689 tracing::debug!(
690 entities = result.stats.entities_upserted,
691 edges = result.stats.edges_inserted,
692 "graph extraction completed"
693 );
694 ctx.extraction_count.fetch_add(1, Ordering::Relaxed);
695 (true, result.entity_ids)
696 }
697 Ok(Err(e)) => {
698 tracing::warn!("graph extraction failed: {e:#}");
699 ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
700 (false, vec![])
701 }
702 Err(_elapsed) => {
703 tracing::warn!("graph extraction timed out");
704 ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
705 (false, vec![])
706 }
707 };
708
709 run_note_linking(
710 extraction_ok,
711 &new_entity_ids,
712 ctx.pool.clone(),
713 ctx.embedding_store,
714 ctx.provider.clone(),
715 &config,
716 )
717 .await;
718
719 maybe_refresh_communities(
720 extraction_ok,
721 ctx.pool,
722 ctx.provider,
723 ctx.failure_counter,
724 &config,
725 )
726 .await;
727}
728
729async fn run_note_linking(
731 extraction_ok: bool,
732 new_entity_ids: &[i64],
733 pool: DbPool,
734 embedding_store: Option<Arc<EmbeddingStore>>,
735 provider: AnyProvider,
736 config: &GraphExtractionConfig,
737) {
738 if !extraction_ok || !config.note_linking.enabled || new_entity_ids.is_empty() {
739 return;
740 }
741 let Some(store) = embedding_store else {
742 return;
743 };
744 let linking_timeout = std::time::Duration::from_secs(config.note_linking.timeout_secs);
745 match tokio::time::timeout(
746 linking_timeout,
747 link_memory_notes(new_entity_ids, pool, store, provider, &config.note_linking),
748 )
749 .await
750 {
751 Ok(stats) => {
752 tracing::debug!(
753 entities_processed = stats.entities_processed,
754 edges_created = stats.edges_created,
755 "note linking completed"
756 );
757 }
758 Err(_elapsed) => {
759 tracing::debug!("note linking timed out (partial edges may exist)");
760 }
761 }
762}
763
764async fn maybe_refresh_communities(
767 extraction_ok: bool,
768 pool: DbPool,
769 provider: AnyProvider,
770 failure_counter: Arc<std::sync::atomic::AtomicU64>,
771 config: &GraphExtractionConfig,
772) {
773 use crate::graph::GraphStore;
774
775 if !extraction_ok || config.community_refresh_interval == 0 {
776 return;
777 }
778
779 let store = GraphStore::new(pool.clone());
780 let extraction_count = store.extraction_count().await.unwrap_or(0);
781 if extraction_count == 0
782 || !i64::try_from(config.community_refresh_interval)
783 .is_ok_and(|interval| extraction_count % interval == 0)
784 {
785 return;
786 }
787
788 tracing::info!(extraction_count, "triggering community detection refresh");
789 let store2 = GraphStore::new(pool);
790 let provider2 = provider;
791 let retention_days = config.expired_edge_retention_days;
792 let max_cap = config.max_entities_cap;
793 let max_prompt_bytes = config.community_summary_max_prompt_bytes;
794 let concurrency = config.community_summary_concurrency;
795 let edge_chunk_size = config.lpa_edge_chunk_size;
796 let decay_lambda = config.link_weight_decay_lambda;
797 let decay_interval_secs = config.link_weight_decay_interval_secs;
798 tokio::spawn(async move {
799 match crate::graph::community::detect_communities(
800 &store2,
801 &provider2,
802 max_prompt_bytes,
803 concurrency,
804 edge_chunk_size,
805 )
806 .await
807 {
808 Ok(count) => {
809 tracing::info!(communities = count, "community detection complete");
810 }
811 Err(e) => {
812 tracing::warn!("community detection failed: {e:#}");
813 failure_counter.fetch_add(1, Ordering::Relaxed);
814 }
815 }
816 match crate::graph::community::run_graph_eviction(&store2, retention_days, max_cap).await {
817 Ok(stats) => {
818 tracing::info!(
819 expired_edges = stats.expired_edges_deleted,
820 orphan_entities = stats.orphan_entities_deleted,
821 capped_entities = stats.capped_entities_deleted,
822 "graph eviction complete"
823 );
824 }
825 Err(e) => {
826 tracing::warn!("graph eviction failed: {e:#}");
827 }
828 }
829
830 if decay_lambda > 0.0 && decay_interval_secs > 0 {
832 let now_secs = std::time::SystemTime::now()
833 .duration_since(std::time::UNIX_EPOCH)
834 .map_or(0, |d| d.as_secs());
835 let last_decay = store2
836 .get_metadata("last_link_weight_decay_at")
837 .await
838 .ok()
839 .flatten()
840 .and_then(|s| s.parse::<u64>().ok())
841 .unwrap_or(0);
842 if now_secs.saturating_sub(last_decay) >= decay_interval_secs {
843 match store2
844 .decay_edge_retrieval_counts(decay_lambda, decay_interval_secs)
845 .await
846 {
847 Ok(affected) => {
848 tracing::info!(affected, "link weight decay applied");
849 let _ = store2
850 .set_metadata("last_link_weight_decay_at", &now_secs.to_string())
851 .await;
852 }
853 Err(e) => {
854 tracing::warn!("link weight decay failed: {e:#}");
855 }
856 }
857 }
858 }
859 });
860}
861
862#[cfg(test)]
863mod tests {
864 use std::sync::Arc;
865
866 use zeph_llm::any::AnyProvider;
867
868 use super::{NoteLinkingConfig, extract_and_store};
869 use crate::embedding_store::EmbeddingStore;
870 use crate::graph::GraphStore;
871 use crate::in_memory_store::InMemoryVectorStore;
872 use crate::store::SqliteStore;
873
874 use super::GraphExtractionConfig;
875
876 async fn setup() -> (GraphStore, Arc<EmbeddingStore>) {
877 let sqlite = SqliteStore::new(":memory:").await.unwrap();
878 let pool = sqlite.pool().clone();
879 let mem_store = Box::new(InMemoryVectorStore::new());
880 let emb = Arc::new(EmbeddingStore::with_store(mem_store, pool.clone()));
881 let gs = GraphStore::new(pool);
882 (gs, emb)
883 }
884
885 #[tokio::test]
888 async fn extract_and_store_sets_qdrant_point_id_when_embedding_store_provided() {
889 let (gs, emb) = setup().await;
890
891 let extraction_json = r#"{"entities":[{"name":"Rust","type":"language","summary":"systems language"}],"edges":[]}"#;
893 let mut mock =
894 zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
895 mock.supports_embeddings = true;
896 mock.embedding = vec![1.0_f32, 0.0, 0.0, 0.0];
897 let provider = AnyProvider::Mock(mock);
898
899 let config = GraphExtractionConfig {
900 max_entities: 10,
901 max_edges: 10,
902 extraction_timeout_secs: 10,
903 ..Default::default()
904 };
905
906 let result = extract_and_store(
907 "Rust is a systems programming language.".to_owned(),
908 vec![],
909 provider,
910 gs.pool().clone(),
911 config,
912 None,
913 Some(emb.clone()),
914 )
915 .await
916 .unwrap();
917
918 assert_eq!(
919 result.stats.entities_upserted, 1,
920 "one entity should be upserted"
921 );
922
923 let entity = gs
927 .find_entity("rust", crate::graph::EntityType::Language)
928 .await
929 .unwrap()
930 .expect("entity 'rust' must exist in SQLite");
931
932 assert!(
933 entity.qdrant_point_id.is_some(),
934 "qdrant_point_id must be set when embedding_store + provider are both provided (regression for #1829)"
935 );
936 }
937
938 #[tokio::test]
941 async fn extract_and_store_without_embedding_store_still_upserts_entities() {
942 let (gs, _emb) = setup().await;
943
944 let extraction_json = r#"{"entities":[{"name":"Python","type":"language","summary":"scripting"}],"edges":[]}"#;
945 let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
946 let provider = AnyProvider::Mock(mock);
947
948 let config = GraphExtractionConfig {
949 max_entities: 10,
950 max_edges: 10,
951 extraction_timeout_secs: 10,
952 ..Default::default()
953 };
954
955 let result = extract_and_store(
956 "Python is a scripting language.".to_owned(),
957 vec![],
958 provider,
959 gs.pool().clone(),
960 config,
961 None,
962 None, )
964 .await
965 .unwrap();
966
967 assert_eq!(result.stats.entities_upserted, 1);
968
969 let entity = gs
970 .find_entity("python", crate::graph::EntityType::Language)
971 .await
972 .unwrap()
973 .expect("entity 'python' must exist");
974
975 assert!(
976 entity.qdrant_point_id.is_none(),
977 "qdrant_point_id must remain None when no embedding_store is provided"
978 );
979 }
980
981 #[tokio::test]
985 async fn extract_and_store_fts5_cross_session_visibility() {
986 let file = tempfile::NamedTempFile::new().expect("tempfile");
987 let path = file.path().to_str().expect("valid path").to_string();
988
989 {
991 let sqlite = crate::store::SqliteStore::new(&path).await.unwrap();
992 let extraction_json = r#"{"entities":[{"name":"Ferris","type":"concept","summary":"Rust mascot"}],"edges":[]}"#;
993 let mock =
994 zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
995 let provider = AnyProvider::Mock(mock);
996 let config = GraphExtractionConfig {
997 max_entities: 10,
998 max_edges: 10,
999 extraction_timeout_secs: 10,
1000 ..Default::default()
1001 };
1002 extract_and_store(
1003 "Ferris is the Rust mascot.".to_owned(),
1004 vec![],
1005 provider,
1006 sqlite.pool().clone(),
1007 config,
1008 None,
1009 None,
1010 )
1011 .await
1012 .unwrap();
1013 }
1014
1015 let sqlite_b = crate::store::SqliteStore::new(&path).await.unwrap();
1017 let gs_b = crate::graph::GraphStore::new(sqlite_b.pool().clone());
1018 let results = gs_b.find_entities_fuzzy("Ferris", 10).await.unwrap();
1019 assert!(
1020 !results.is_empty(),
1021 "FTS5 cross-session (#2166): entity extracted in session A must be visible in session B"
1022 );
1023 }
1024
1025 #[tokio::test]
1028 async fn extract_and_store_skips_self_loop_edges() {
1029 let (gs, _emb) = setup().await;
1030
1031 let extraction_json = r#"{
1033 "entities":[{"name":"Rust","type":"language","summary":"systems language"}],
1034 "edges":[{"source":"Rust","target":"Rust","relation":"is","fact":"Rust is Rust","edge_type":"semantic"}]
1035 }"#;
1036 let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1037 let provider = AnyProvider::Mock(mock);
1038
1039 let config = GraphExtractionConfig {
1040 max_entities: 10,
1041 max_edges: 10,
1042 extraction_timeout_secs: 10,
1043 ..Default::default()
1044 };
1045
1046 let result = extract_and_store(
1047 "Rust is a language.".to_owned(),
1048 vec![],
1049 provider,
1050 gs.pool().clone(),
1051 config,
1052 None,
1053 None,
1054 )
1055 .await
1056 .unwrap();
1057
1058 assert_eq!(result.stats.entities_upserted, 1);
1059 assert_eq!(
1060 result.stats.edges_inserted, 0,
1061 "self-loop edge must be rejected (#2215)"
1062 );
1063 }
1064
1065 #[tokio::test]
1070 async fn apex_mem_path_inserts_edge_via_insert_or_supersede() {
1071 let (gs, _emb) = setup().await;
1072
1073 let extraction_json = r#"{
1074 "entities":[
1075 {"name":"Alice","type":"person","summary":"a person"},
1076 {"name":"Bob","type":"person","summary":"another person"}
1077 ],
1078 "edges":[
1079 {"source":"Alice","target":"Bob","relation":"KNOWS","fact":"Alice knows Bob","edge_type":"semantic"}
1080 ]
1081 }"#;
1082 let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1083 let provider = AnyProvider::Mock(mock);
1084
1085 let config = GraphExtractionConfig {
1086 max_entities: 10,
1087 max_edges: 10,
1088 extraction_timeout_secs: 10,
1089 apex_mem_enabled: true,
1090 ..Default::default()
1091 };
1092
1093 let result = extract_and_store(
1094 "Alice knows Bob.".to_owned(),
1095 vec![],
1096 provider,
1097 gs.pool().clone(),
1098 config,
1099 None,
1100 None,
1101 )
1102 .await
1103 .unwrap();
1104
1105 assert_eq!(result.stats.entities_upserted, 2, "two entities expected");
1106 assert_eq!(
1107 result.stats.edges_inserted, 1,
1108 "APEX-MEM path must insert the edge and count it (#3631)"
1109 );
1110
1111 let alice_id = gs
1113 .find_entity("alice", crate::graph::EntityType::Person)
1114 .await
1115 .unwrap()
1116 .expect("entity 'alice' must exist")
1117 .id
1118 .0;
1119 let bob_id = gs
1120 .find_entity("bob", crate::graph::EntityType::Person)
1121 .await
1122 .unwrap()
1123 .expect("entity 'bob' must exist")
1124 .id
1125 .0;
1126 let edges = gs.edges_exact(alice_id, bob_id).await.unwrap();
1127 assert_eq!(edges.len(), 1, "exactly one edge expected");
1128 assert_eq!(
1130 edges[0].relation, "KNOWS",
1131 "display relation must preserve original casing"
1132 );
1133 }
1134
1135 #[tokio::test]
1138 async fn embed_work_items_timeout_returns_empty() {
1139 use zeph_llm::mock::MockProvider;
1140
1141 tokio::time::pause();
1144
1145 let mut mock = MockProvider::default();
1147 mock.supports_embeddings = true;
1148 mock.embed_delay_ms = 31_000;
1149 let provider = AnyProvider::Mock(mock);
1150
1151 let work_items = vec![super::EntityWorkItem {
1152 entity_id: 1,
1153 canonical_name: "Alice".to_owned(),
1154 embed_text: "Alice".to_owned(),
1155 self_point_id: None,
1156 }];
1157
1158 let cfg = NoteLinkingConfig {
1159 timeout_secs: 30,
1160 ..NoteLinkingConfig::default()
1161 };
1162 let result = super::embed_work_items(&work_items, &provider, &cfg).await;
1163 assert!(
1164 result.is_empty(),
1165 "embed_work_items must return empty Vec on 30 s timeout (fail-open)"
1166 );
1167 }
1168}