Skip to main content

zeph_memory/semantic/
graph.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::sync::Arc;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use std::sync::atomic::Ordering;
9use zeph_db::DbPool;
10
11pub use zeph_common::config::memory::NoteLinkingConfig;
12use zeph_llm::any::AnyProvider;
13use zeph_llm::provider::LlmProvider as _;
14
15use crate::embedding_store::EmbeddingStore;
16use crate::error::MemoryError;
17use crate::graph::extractor::ExtractionResult as ExtractorResult;
18use crate::vector_store::VectorFilter;
19
20use super::SemanticMemory;
21
22/// Callback type for post-extraction validation.
23///
24/// A generic predicate opaque to zeph-memory — callers (zeph-core) provide security
25/// validation without introducing a dependency on security policy in this crate.
26pub type PostExtractValidator = Option<Box<dyn Fn(&ExtractorResult) -> Result<(), String> + Send>>;
27
28/// Config for the spawned background extraction task.
29///
30/// Owned clone of the relevant fields from `GraphConfig` — no references, safe to send to
31/// spawned tasks.
32#[derive(Debug, Clone)]
33pub struct GraphExtractionConfig {
34    pub max_entities: usize,
35    pub max_edges: usize,
36    pub extraction_timeout_secs: u64,
37    pub community_refresh_interval: usize,
38    pub expired_edge_retention_days: u32,
39    pub max_entities_cap: usize,
40    pub community_summary_max_prompt_bytes: usize,
41    pub community_summary_concurrency: usize,
42    pub lpa_edge_chunk_size: usize,
43    /// A-MEM note linking config, cloned from `GraphConfig.note_linking`.
44    pub note_linking: NoteLinkingConfig,
45    /// A-MEM link weight decay lambda. Range: `(0.0, 1.0]`. Default: `0.95`.
46    pub link_weight_decay_lambda: f64,
47    /// Seconds between link weight decay passes. Default: `86400`.
48    pub link_weight_decay_interval_secs: u64,
49    /// Kumiho belief revision: enable semantic contradiction detection for edges.
50    pub belief_revision_enabled: bool,
51    /// Cosine similarity threshold for belief revision contradiction detection.
52    pub belief_revision_similarity_threshold: f32,
53    /// GAAMA episode linking: `conversation_id` to link extracted entities to their episode.
54    /// `None` disables episode linking for this extraction pass.
55    pub conversation_id: Option<i64>,
56}
57
58impl Default for GraphExtractionConfig {
59    fn default() -> Self {
60        Self {
61            max_entities: 0,
62            max_edges: 0,
63            extraction_timeout_secs: 0,
64            community_refresh_interval: 0,
65            expired_edge_retention_days: 0,
66            max_entities_cap: 0,
67            community_summary_max_prompt_bytes: 0,
68            community_summary_concurrency: 0,
69            lpa_edge_chunk_size: 0,
70            note_linking: NoteLinkingConfig::default(),
71            link_weight_decay_lambda: 0.95,
72            link_weight_decay_interval_secs: 86400,
73            belief_revision_enabled: false,
74            belief_revision_similarity_threshold: 0.85,
75            conversation_id: None,
76        }
77    }
78}
79
80/// Stats returned from a completed extraction.
81#[derive(Debug, Default)]
82pub struct ExtractionStats {
83    pub entities_upserted: usize,
84    pub edges_inserted: usize,
85}
86
87/// Result returned from `extract_and_store`, combining stats with entity IDs needed for linking.
88#[derive(Debug, Default)]
89pub struct ExtractionResult {
90    pub stats: ExtractionStats,
91    /// IDs of entities upserted during this extraction pass. Passed to `link_memory_notes`.
92    pub entity_ids: Vec<i64>,
93}
94
95/// Stats returned from a completed note-linking pass.
96#[derive(Debug, Default)]
97pub struct LinkingStats {
98    pub entities_processed: usize,
99    pub edges_created: usize,
100}
101
102/// Qdrant collection name for entity embeddings (mirrors the constant in `resolver.rs`).
103const ENTITY_COLLECTION: &str = "zeph_graph_entities";
104
105/// Work item for a single entity during a note-linking pass.
106struct EntityWorkItem {
107    entity_id: i64,
108    canonical_name: String,
109    embed_text: String,
110    self_point_id: Option<String>,
111}
112
113/// Link newly extracted entities to semantically similar entities in the graph.
114///
115/// For each entity in `entity_ids`:
116/// 1. Load the entity name + summary from `SQLite`.
117/// 2. Embed all entity texts in parallel.
118/// 3. Search the entity embedding collection in parallel for the `top_k + 1` most similar points.
119/// 4. Filter out the entity itself (by `qdrant_point_id` or `entity_id` payload) and points
120///    below `similarity_threshold`.
121/// 5. Insert a unidirectional `similar_to` edge where `source_id < target_id` to avoid
122///    double-counting in BFS recall while still being traversable via the OR clause in
123///    `edges_for_entity`. The edge confidence is set to the cosine similarity score.
124/// 6. Deduplicate pairs within a single pass so that a pair encountered from both A→B and B→A
125///    directions is only inserted once, keeping `edges_created` accurate.
126///
127/// Errors are logged and not propagated — this is a best-effort background enrichment step.
128pub async fn link_memory_notes(
129    entity_ids: &[i64],
130    pool: DbPool,
131    embedding_store: Arc<EmbeddingStore>,
132    provider: AnyProvider,
133    cfg: &NoteLinkingConfig,
134) -> LinkingStats {
135    use crate::graph::GraphStore;
136
137    let store = GraphStore::new(pool);
138    let mut stats = LinkingStats::default();
139
140    let work_items = collect_note_link_work_items(entity_ids, &store).await;
141    if work_items.is_empty() {
142        return stats;
143    }
144
145    let valid = embed_work_items(&work_items, &provider).await;
146
147    let search_limit = cfg.top_k + 1; // +1 to account for self-match
148    let search_results = search_similar_for_items(&valid, &embedding_store, search_limit).await;
149
150    insert_similarity_edges(
151        &work_items,
152        &valid,
153        &search_results,
154        cfg,
155        &store,
156        &mut stats,
157    )
158    .await;
159
160    stats
161}
162
163/// Phase 1: load entities from the DB and build work items for embedding.
164///
165/// Processes entities sequentially to avoid connection-pool contention.
166async fn collect_note_link_work_items(
167    entity_ids: &[i64],
168    store: &crate::graph::GraphStore,
169) -> Vec<EntityWorkItem> {
170    let mut work_items: Vec<EntityWorkItem> = Vec::with_capacity(entity_ids.len());
171    for &entity_id in entity_ids {
172        let entity = match store.find_entity_by_id(entity_id).await {
173            Ok(Some(e)) => e,
174            Ok(None) => {
175                tracing::debug!("note_linking: entity {entity_id} not found, skipping");
176                continue;
177            }
178            Err(e) => {
179                tracing::debug!("note_linking: DB error loading entity {entity_id}: {e:#}");
180                continue;
181            }
182        };
183        let embed_text = match &entity.summary {
184            Some(s) if !s.is_empty() => format!("{}: {s}", entity.canonical_name),
185            _ => entity.canonical_name.clone(),
186        };
187        work_items.push(EntityWorkItem {
188            entity_id,
189            canonical_name: entity.canonical_name,
190            embed_text,
191            self_point_id: entity.qdrant_point_id,
192        });
193    }
194    work_items
195}
196
197/// Phase 2: embed all entity texts in parallel.
198///
199/// Returns `(work_idx, embedding)` pairs for successfully embedded items.
200/// Items that fail to embed are logged and dropped.
201async fn embed_work_items(
202    work_items: &[EntityWorkItem],
203    provider: &AnyProvider,
204) -> Vec<(usize, Vec<f32>)> {
205    use futures::future;
206
207    let embed_results: Vec<_> =
208        future::join_all(work_items.iter().map(|w| provider.embed(&w.embed_text))).await;
209
210    embed_results
211        .into_iter()
212        .enumerate()
213        .filter_map(|(i, r)| match r {
214            Ok(v) => Some((i, v)),
215            Err(e) => {
216                tracing::debug!(
217                    "note_linking: embed failed for entity {:?}: {e:#}",
218                    work_items[i].canonical_name
219                );
220                None
221            }
222        })
223        .collect()
224}
225
226/// Phase 3: search the embedding store for similar entities for each embedded work item.
227async fn search_similar_for_items(
228    valid: &[(usize, Vec<f32>)],
229    embedding_store: &EmbeddingStore,
230    search_limit: usize,
231) -> Vec<Result<Vec<crate::ScoredVectorPoint>, MemoryError>> {
232    use futures::future;
233
234    future::join_all(valid.iter().map(|(_, vec)| {
235        embedding_store.search_collection(
236            ENTITY_COLLECTION,
237            vec,
238            search_limit,
239            None::<VectorFilter>,
240        )
241    }))
242    .await
243}
244
245/// Phase 4: insert similarity edges, deduplicating pairs seen from both A→B and B→A.
246///
247/// Without deduplication, both directions would call `insert_edge` for the same normalised
248/// pair and both return `Ok`, inflating `edges_created` by the number of bidirectional hits.
249async fn insert_similarity_edges(
250    work_items: &[EntityWorkItem],
251    valid: &[(usize, Vec<f32>)],
252    search_results: &[Result<Vec<crate::ScoredVectorPoint>, MemoryError>],
253    cfg: &NoteLinkingConfig,
254    store: &crate::graph::GraphStore,
255    stats: &mut LinkingStats,
256) {
257    let mut seen_pairs = std::collections::HashSet::new();
258
259    for ((work_idx, _), search_result) in valid.iter().zip(search_results.iter()) {
260        let w = &work_items[*work_idx];
261
262        let results = match search_result {
263            Ok(r) => r,
264            Err(e) => {
265                tracing::debug!(
266                    "note_linking: search failed for entity {:?}: {e:#}",
267                    w.canonical_name
268                );
269                continue;
270            }
271        };
272
273        stats.entities_processed += 1;
274
275        let self_point_id = w.self_point_id.as_deref();
276        let candidates = results
277            .iter()
278            .filter(|p| Some(p.id.as_str()) != self_point_id && p.score >= cfg.similarity_threshold)
279            .take(cfg.top_k);
280
281        for point in candidates {
282            let Some(target_id) = point
283                .payload
284                .get("entity_id")
285                .and_then(serde_json::Value::as_i64)
286            else {
287                tracing::debug!(
288                    "note_linking: missing entity_id in payload for point {}",
289                    point.id
290                );
291                continue;
292            };
293
294            if target_id == w.entity_id {
295                continue; // secondary self-guard when qdrant_point_id is null
296            }
297
298            // Normalise direction: always store source_id < target_id.
299            let (src, tgt) = if w.entity_id < target_id {
300                (w.entity_id, target_id)
301            } else {
302                (target_id, w.entity_id)
303            };
304
305            if !seen_pairs.insert((src, tgt)) {
306                continue;
307            }
308
309            let fact = format!("Semantically similar entities (score: {:.3})", point.score);
310
311            match store
312                .insert_edge(src, tgt, "similar_to", &fact, point.score, None)
313                .await
314            {
315                Ok(_) => stats.edges_created += 1,
316                Err(e) => {
317                    tracing::debug!("note_linking: insert_edge failed: {e:#}");
318                }
319            }
320        }
321    }
322}
323
324/// Extract entities and edges from `content` and persist them to the graph store.
325///
326/// This function runs inside a spawned task — it receives owned data only.
327///
328/// The optional `embedding_store` enables entity embedding storage in Qdrant, which is
329/// required for A-MEM note linking to find semantically similar entities across sessions.
330///
331/// # Errors
332///
333/// Returns an error if the database query fails or LLM extraction fails.
334#[cfg_attr(
335    feature = "profiling",
336    tracing::instrument(name = "memory.graph_extract", skip_all, fields(entities = tracing::field::Empty, edges = tracing::field::Empty))
337)]
338pub async fn extract_and_store(
339    content: String,
340    context_messages: Vec<String>,
341    provider: AnyProvider,
342    pool: DbPool,
343    config: GraphExtractionConfig,
344    post_extract_validator: PostExtractValidator,
345    embedding_store: Option<Arc<EmbeddingStore>>,
346) -> Result<ExtractionResult, MemoryError> {
347    use crate::graph::{EntityResolver, GraphExtractor, GraphStore};
348
349    let extractor = GraphExtractor::new(provider.clone(), config.max_entities, config.max_edges);
350    let ctx_refs: Vec<&str> = context_messages.iter().map(String::as_str).collect();
351
352    let store = GraphStore::new(pool);
353
354    bump_extraction_count(store.pool()).await?;
355
356    let Some(result) = extractor.extract(&content, &ctx_refs).await? else {
357        return Ok(ExtractionResult::default());
358    };
359
360    // Post-extraction validation callback. zeph-memory does not know the callback is a
361    // security validator — it is a generic predicate opaque to this crate (design decision D1).
362    if let Some(ref validator) = post_extract_validator
363        && let Err(reason) = validator(&result)
364    {
365        tracing::warn!(
366            reason,
367            "graph extraction validation failed, skipping upsert"
368        );
369        return Ok(ExtractionResult::default());
370    }
371
372    let resolver = if let Some(ref emb) = embedding_store {
373        EntityResolver::new(&store)
374            .with_embedding_store(emb)
375            .with_provider(&provider)
376    } else {
377        EntityResolver::new(&store)
378    };
379
380    let (entity_name_to_id, entities_upserted) = upsert_entities(&resolver, &result.entities).await;
381    let edges_inserted = insert_edges(&resolver, &result.edges, &entity_name_to_id, &config).await;
382
383    store.checkpoint_wal().await?;
384
385    let new_entity_ids: Vec<i64> = entity_name_to_id.into_values().collect();
386
387    link_episode(&store, &config, &new_entity_ids).await;
388
389    #[cfg(feature = "profiling")]
390    {
391        let span = tracing::Span::current();
392        span.record("entities", entities_upserted);
393        span.record("edges", edges_inserted);
394    }
395
396    Ok(ExtractionResult {
397        stats: ExtractionStats {
398            entities_upserted,
399            edges_inserted,
400        },
401        entity_ids: new_entity_ids,
402    })
403}
404
405/// Increment the extraction counter in `graph_metadata`.
406async fn bump_extraction_count(pool: &DbPool) -> Result<(), MemoryError> {
407    zeph_db::query(sql!(
408        "INSERT INTO graph_metadata (key, value) VALUES ('extraction_count', '0')
409         ON CONFLICT(key) DO NOTHING"
410    ))
411    .execute(pool)
412    .await?;
413    zeph_db::query(sql!(
414        "UPDATE graph_metadata
415         SET value = CAST(CAST(value AS INTEGER) + 1 AS TEXT)
416         WHERE key = 'extraction_count'"
417    ))
418    .execute(pool)
419    .await?;
420    Ok(())
421}
422
423/// Upsert all extracted entities and return the name-to-id map and upsert count.
424async fn upsert_entities(
425    resolver: &crate::graph::EntityResolver<'_>,
426    entities: &[crate::graph::extractor::ExtractedEntity],
427) -> (std::collections::HashMap<String, i64>, usize) {
428    let mut entity_name_to_id: std::collections::HashMap<String, i64> =
429        std::collections::HashMap::new();
430    let mut entities_upserted = 0usize;
431
432    for entity in entities {
433        match resolver
434            .resolve(&entity.name, &entity.entity_type, entity.summary.as_deref())
435            .await
436        {
437            Ok((id, _outcome)) => {
438                entity_name_to_id.insert(entity.name.clone(), id);
439                entities_upserted += 1;
440            }
441            Err(e) => {
442                tracing::debug!("graph: skipping entity {:?}: {e:#}", entity.name);
443            }
444        }
445    }
446
447    (entity_name_to_id, entities_upserted)
448}
449
450/// Insert extracted edges that have both endpoints in `name_to_id`.
451///
452/// Returns the number of edges actually inserted.
453async fn insert_edges(
454    resolver: &crate::graph::EntityResolver<'_>,
455    edges: &[crate::graph::extractor::ExtractedEdge],
456    name_to_id: &std::collections::HashMap<String, i64>,
457    config: &GraphExtractionConfig,
458) -> usize {
459    let mut edges_inserted = 0usize;
460    for edge in edges {
461        let (Some(&src_id), Some(&tgt_id)) =
462            (name_to_id.get(&edge.source), name_to_id.get(&edge.target))
463        else {
464            tracing::debug!(
465                "graph: skipping edge {:?}->{:?}: entity not resolved",
466                edge.source,
467                edge.target
468            );
469            continue;
470        };
471        if src_id == tgt_id {
472            tracing::debug!(
473                "graph: skipping self-loop edge {:?}->{:?} (entity_id={src_id})",
474                edge.source,
475                edge.target
476            );
477            continue;
478        }
479        // Parse LLM-provided edge_type; default to Semantic on any parse failure so
480        // edges are never dropped due to classification errors.
481        let edge_type = edge
482            .edge_type
483            .parse::<crate::graph::EdgeType>()
484            .unwrap_or_else(|_| {
485                tracing::warn!(
486                    raw_type = %edge.edge_type,
487                    "graph: unknown edge_type from LLM, defaulting to semantic"
488                );
489                crate::graph::EdgeType::Semantic
490            });
491        let belief_cfg =
492            config
493                .belief_revision_enabled
494                .then_some(crate::graph::BeliefRevisionConfig {
495                    similarity_threshold: config.belief_revision_similarity_threshold,
496                });
497        match resolver
498            .resolve_edge_typed(
499                src_id,
500                tgt_id,
501                &edge.relation,
502                &edge.fact,
503                0.8,
504                None,
505                edge_type,
506                belief_cfg.as_ref(),
507            )
508            .await
509        {
510            Ok(Some(_)) => edges_inserted += 1,
511            Ok(None) => {} // deduplicated
512            Err(e) => {
513                tracing::debug!("graph: skipping edge: {e:#}");
514            }
515        }
516    }
517    edges_inserted
518}
519
520/// Link extracted entities to their GAAMA episode when a conversation ID is configured.
521async fn link_episode(
522    store: &crate::graph::GraphStore,
523    config: &GraphExtractionConfig,
524    entity_ids: &[i64],
525) {
526    let Some(conv_id) = config.conversation_id else {
527        return;
528    };
529    match store.ensure_episode(conv_id).await {
530        Ok(episode_id) => {
531            for &entity_id in entity_ids {
532                if let Err(e) = store.link_entity_to_episode(episode_id, entity_id).await {
533                    tracing::debug!("episode linking skipped for entity {entity_id}: {e:#}");
534                }
535            }
536        }
537        Err(e) => {
538            tracing::warn!("failed to ensure episode for conversation {conv_id}: {e:#}");
539        }
540    }
541}
542
543impl SemanticMemory {
544    /// Spawn background graph extraction for a message. Fire-and-forget — never blocks.
545    ///
546    /// Extraction runs in a separate tokio task with a timeout. Any error or timeout is
547    /// logged and the task exits silently; the agent response is never blocked.
548    ///
549    /// The optional `post_extract_validator` is called after extraction, before upsert.
550    /// It is a generic predicate opaque to zeph-memory (design decision D1).
551    ///
552    /// When `config.note_linking.enabled` is `true` and an embedding store is available,
553    /// `link_memory_notes` runs after successful extraction inside the same task, bounded
554    /// by `config.note_linking.timeout_secs`.
555    pub fn spawn_graph_extraction(
556        &self,
557        content: String,
558        context_messages: Vec<String>,
559        config: GraphExtractionConfig,
560        post_extract_validator: PostExtractValidator,
561    ) -> tokio::task::JoinHandle<()> {
562        let ctx = GraphExtractionTaskCtx {
563            pool: self.sqlite.pool().clone(),
564            provider: self.provider.clone(),
565            failure_counter: self.community_detection_failures.clone(),
566            extraction_count: self.graph_extraction_count.clone(),
567            extraction_failures: self.graph_extraction_failures.clone(),
568            embedding_store: self.qdrant.clone(),
569        };
570
571        tokio::spawn(run_graph_extraction_task(
572            content,
573            context_messages,
574            config,
575            post_extract_validator,
576            ctx,
577        ))
578    }
579}
580
581/// Owned context bundled for the spawned extraction task.
582///
583/// Bundles the Arcs that must be cloned before entering `tokio::spawn`.
584struct GraphExtractionTaskCtx {
585    pool: DbPool,
586    provider: AnyProvider,
587    failure_counter: Arc<std::sync::atomic::AtomicU64>,
588    extraction_count: Arc<std::sync::atomic::AtomicU64>,
589    extraction_failures: Arc<std::sync::atomic::AtomicU64>,
590    embedding_store: Option<Arc<EmbeddingStore>>,
591}
592
593/// Body of the spawned graph-extraction task.
594async fn run_graph_extraction_task(
595    content: String,
596    context_messages: Vec<String>,
597    config: GraphExtractionConfig,
598    post_extract_validator: PostExtractValidator,
599    ctx: GraphExtractionTaskCtx,
600) {
601    let timeout_dur = std::time::Duration::from_secs(config.extraction_timeout_secs);
602    let extraction_result = tokio::time::timeout(
603        timeout_dur,
604        extract_and_store(
605            content,
606            context_messages,
607            ctx.provider.clone(),
608            ctx.pool.clone(),
609            config.clone(),
610            post_extract_validator,
611            ctx.embedding_store.clone(),
612        ),
613    )
614    .await;
615
616    let (extraction_ok, new_entity_ids) = match extraction_result {
617        Ok(Ok(result)) => {
618            tracing::debug!(
619                entities = result.stats.entities_upserted,
620                edges = result.stats.edges_inserted,
621                "graph extraction completed"
622            );
623            ctx.extraction_count.fetch_add(1, Ordering::Relaxed);
624            (true, result.entity_ids)
625        }
626        Ok(Err(e)) => {
627            tracing::warn!("graph extraction failed: {e:#}");
628            ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
629            (false, vec![])
630        }
631        Err(_elapsed) => {
632            tracing::warn!("graph extraction timed out");
633            ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
634            (false, vec![])
635        }
636    };
637
638    run_note_linking(
639        extraction_ok,
640        &new_entity_ids,
641        ctx.pool.clone(),
642        ctx.embedding_store,
643        ctx.provider.clone(),
644        &config,
645    )
646    .await;
647
648    maybe_refresh_communities(
649        extraction_ok,
650        ctx.pool,
651        ctx.provider,
652        ctx.failure_counter,
653        &config,
654    )
655    .await;
656}
657
658/// Run A-MEM note linking after successful extraction when enabled.
659async fn run_note_linking(
660    extraction_ok: bool,
661    new_entity_ids: &[i64],
662    pool: DbPool,
663    embedding_store: Option<Arc<EmbeddingStore>>,
664    provider: AnyProvider,
665    config: &GraphExtractionConfig,
666) {
667    if !extraction_ok || !config.note_linking.enabled || new_entity_ids.is_empty() {
668        return;
669    }
670    let Some(store) = embedding_store else {
671        return;
672    };
673    let linking_timeout = std::time::Duration::from_secs(config.note_linking.timeout_secs);
674    match tokio::time::timeout(
675        linking_timeout,
676        link_memory_notes(new_entity_ids, pool, store, provider, &config.note_linking),
677    )
678    .await
679    {
680        Ok(stats) => {
681            tracing::debug!(
682                entities_processed = stats.entities_processed,
683                edges_created = stats.edges_created,
684                "note linking completed"
685            );
686        }
687        Err(_elapsed) => {
688            tracing::debug!("note linking timed out (partial edges may exist)");
689        }
690    }
691}
692
693/// Trigger community detection, graph eviction, and link-weight decay when the extraction
694/// count hits the configured refresh interval.
695async fn maybe_refresh_communities(
696    extraction_ok: bool,
697    pool: DbPool,
698    provider: AnyProvider,
699    failure_counter: Arc<std::sync::atomic::AtomicU64>,
700    config: &GraphExtractionConfig,
701) {
702    use crate::graph::GraphStore;
703
704    if !extraction_ok || config.community_refresh_interval == 0 {
705        return;
706    }
707
708    let store = GraphStore::new(pool.clone());
709    let extraction_count = store.extraction_count().await.unwrap_or(0);
710    if extraction_count == 0
711        || !i64::try_from(config.community_refresh_interval)
712            .is_ok_and(|interval| extraction_count % interval == 0)
713    {
714        return;
715    }
716
717    tracing::info!(extraction_count, "triggering community detection refresh");
718    let store2 = GraphStore::new(pool);
719    let provider2 = provider;
720    let retention_days = config.expired_edge_retention_days;
721    let max_cap = config.max_entities_cap;
722    let max_prompt_bytes = config.community_summary_max_prompt_bytes;
723    let concurrency = config.community_summary_concurrency;
724    let edge_chunk_size = config.lpa_edge_chunk_size;
725    let decay_lambda = config.link_weight_decay_lambda;
726    let decay_interval_secs = config.link_weight_decay_interval_secs;
727    tokio::spawn(async move {
728        match crate::graph::community::detect_communities(
729            &store2,
730            &provider2,
731            max_prompt_bytes,
732            concurrency,
733            edge_chunk_size,
734        )
735        .await
736        {
737            Ok(count) => {
738                tracing::info!(communities = count, "community detection complete");
739            }
740            Err(e) => {
741                tracing::warn!("community detection failed: {e:#}");
742                failure_counter.fetch_add(1, Ordering::Relaxed);
743            }
744        }
745        match crate::graph::community::run_graph_eviction(&store2, retention_days, max_cap).await {
746            Ok(stats) => {
747                tracing::info!(
748                    expired_edges = stats.expired_edges_deleted,
749                    orphan_entities = stats.orphan_entities_deleted,
750                    capped_entities = stats.capped_entities_deleted,
751                    "graph eviction complete"
752                );
753            }
754            Err(e) => {
755                tracing::warn!("graph eviction failed: {e:#}");
756            }
757        }
758
759        // Time-based link weight decay — independent of eviction cycle.
760        if decay_lambda > 0.0 && decay_interval_secs > 0 {
761            let now_secs = std::time::SystemTime::now()
762                .duration_since(std::time::UNIX_EPOCH)
763                .map_or(0, |d| d.as_secs());
764            let last_decay = store2
765                .get_metadata("last_link_weight_decay_at")
766                .await
767                .ok()
768                .flatten()
769                .and_then(|s| s.parse::<u64>().ok())
770                .unwrap_or(0);
771            if now_secs.saturating_sub(last_decay) >= decay_interval_secs {
772                match store2
773                    .decay_edge_retrieval_counts(decay_lambda, decay_interval_secs)
774                    .await
775                {
776                    Ok(affected) => {
777                        tracing::info!(affected, "link weight decay applied");
778                        let _ = store2
779                            .set_metadata("last_link_weight_decay_at", &now_secs.to_string())
780                            .await;
781                    }
782                    Err(e) => {
783                        tracing::warn!("link weight decay failed: {e:#}");
784                    }
785                }
786            }
787        }
788    });
789}
790
791#[cfg(test)]
792mod tests {
793    use std::sync::Arc;
794
795    use zeph_llm::any::AnyProvider;
796
797    use super::extract_and_store;
798    use crate::embedding_store::EmbeddingStore;
799    use crate::graph::GraphStore;
800    use crate::in_memory_store::InMemoryVectorStore;
801    use crate::store::SqliteStore;
802
803    use super::GraphExtractionConfig;
804
805    async fn setup() -> (GraphStore, Arc<EmbeddingStore>) {
806        let sqlite = SqliteStore::new(":memory:").await.unwrap();
807        let pool = sqlite.pool().clone();
808        let mem_store = Box::new(InMemoryVectorStore::new());
809        let emb = Arc::new(EmbeddingStore::with_store(mem_store, pool.clone()));
810        let gs = GraphStore::new(pool);
811        (gs, emb)
812    }
813
814    /// Regression test for #1829: `extract_and_store()` must pass the provider to `EntityResolver`
815    /// so that `store_entity_embedding()` is called and `qdrant_point_id` is set in `SQLite`.
816    #[tokio::test]
817    async fn extract_and_store_sets_qdrant_point_id_when_embedding_store_provided() {
818        let (gs, emb) = setup().await;
819
820        // MockProvider: supports embeddings, returns a valid extraction JSON for chat
821        let extraction_json = r#"{"entities":[{"name":"Rust","type":"language","summary":"systems language"}],"edges":[]}"#;
822        let mut mock =
823            zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
824        mock.supports_embeddings = true;
825        mock.embedding = vec![1.0_f32, 0.0, 0.0, 0.0];
826        let provider = AnyProvider::Mock(mock);
827
828        let config = GraphExtractionConfig {
829            max_entities: 10,
830            max_edges: 10,
831            extraction_timeout_secs: 10,
832            ..Default::default()
833        };
834
835        let result = extract_and_store(
836            "Rust is a systems programming language.".to_owned(),
837            vec![],
838            provider,
839            gs.pool().clone(),
840            config,
841            None,
842            Some(emb.clone()),
843        )
844        .await
845        .unwrap();
846
847        assert_eq!(
848            result.stats.entities_upserted, 1,
849            "one entity should be upserted"
850        );
851
852        // The entity must have a qdrant_point_id — this proves store_entity_embedding() was called.
853        // Before the fix, EntityResolver was built without a provider, so embed() was never called
854        // and qdrant_point_id remained NULL.
855        let entity = gs
856            .find_entity("rust", crate::graph::EntityType::Language)
857            .await
858            .unwrap()
859            .expect("entity 'rust' must exist in SQLite");
860
861        assert!(
862            entity.qdrant_point_id.is_some(),
863            "qdrant_point_id must be set when embedding_store + provider are both provided (regression for #1829)"
864        );
865    }
866
867    /// When no `embedding_store` is provided, `extract_and_store()` must still work correctly
868    /// (no embeddings stored, but entities are still upserted).
869    #[tokio::test]
870    async fn extract_and_store_without_embedding_store_still_upserts_entities() {
871        let (gs, _emb) = setup().await;
872
873        let extraction_json = r#"{"entities":[{"name":"Python","type":"language","summary":"scripting"}],"edges":[]}"#;
874        let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
875        let provider = AnyProvider::Mock(mock);
876
877        let config = GraphExtractionConfig {
878            max_entities: 10,
879            max_edges: 10,
880            extraction_timeout_secs: 10,
881            ..Default::default()
882        };
883
884        let result = extract_and_store(
885            "Python is a scripting language.".to_owned(),
886            vec![],
887            provider,
888            gs.pool().clone(),
889            config,
890            None,
891            None, // no embedding_store
892        )
893        .await
894        .unwrap();
895
896        assert_eq!(result.stats.entities_upserted, 1);
897
898        let entity = gs
899            .find_entity("python", crate::graph::EntityType::Language)
900            .await
901            .unwrap()
902            .expect("entity 'python' must exist");
903
904        assert!(
905            entity.qdrant_point_id.is_none(),
906            "qdrant_point_id must remain None when no embedding_store is provided"
907        );
908    }
909
910    /// Regression test for #2166: FTS5 entity writes must be visible to a new connection pool
911    /// opened after extraction completes. Without `checkpoint_wal()` in `extract_and_store`,
912    /// a fresh pool sees stale FTS5 shadow tables and `find_entities_fuzzy` returns empty.
913    #[tokio::test]
914    async fn extract_and_store_fts5_cross_session_visibility() {
915        let file = tempfile::NamedTempFile::new().expect("tempfile");
916        let path = file.path().to_str().expect("valid path").to_string();
917
918        // Session A: run extract_and_store on a file DB (not :memory:) so WAL is used.
919        {
920            let sqlite = crate::store::SqliteStore::new(&path).await.unwrap();
921            let extraction_json = r#"{"entities":[{"name":"Ferris","type":"concept","summary":"Rust mascot"}],"edges":[]}"#;
922            let mock =
923                zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
924            let provider = AnyProvider::Mock(mock);
925            let config = GraphExtractionConfig {
926                max_entities: 10,
927                max_edges: 10,
928                extraction_timeout_secs: 10,
929                ..Default::default()
930            };
931            extract_and_store(
932                "Ferris is the Rust mascot.".to_owned(),
933                vec![],
934                provider,
935                sqlite.pool().clone(),
936                config,
937                None,
938                None,
939            )
940            .await
941            .unwrap();
942        }
943
944        // Session B: new pool — FTS5 must see the entity extracted in session A.
945        let sqlite_b = crate::store::SqliteStore::new(&path).await.unwrap();
946        let gs_b = crate::graph::GraphStore::new(sqlite_b.pool().clone());
947        let results = gs_b.find_entities_fuzzy("Ferris", 10).await.unwrap();
948        assert!(
949            !results.is_empty(),
950            "FTS5 cross-session (#2166): entity extracted in session A must be visible in session B"
951        );
952    }
953
954    /// Regression test for #2215: self-loop edges (source == target entity) must be silently
955    /// skipped; no edge row should be inserted.
956    #[tokio::test]
957    async fn extract_and_store_skips_self_loop_edges() {
958        let (gs, _emb) = setup().await;
959
960        // LLM returns one entity and one self-loop edge (source == target).
961        let extraction_json = r#"{
962            "entities":[{"name":"Rust","type":"language","summary":"systems language"}],
963            "edges":[{"source":"Rust","target":"Rust","relation":"is","fact":"Rust is Rust","edge_type":"semantic"}]
964        }"#;
965        let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
966        let provider = AnyProvider::Mock(mock);
967
968        let config = GraphExtractionConfig {
969            max_entities: 10,
970            max_edges: 10,
971            extraction_timeout_secs: 10,
972            ..Default::default()
973        };
974
975        let result = extract_and_store(
976            "Rust is a language.".to_owned(),
977            vec![],
978            provider,
979            gs.pool().clone(),
980            config,
981            None,
982            None,
983        )
984        .await
985        .unwrap();
986
987        assert_eq!(result.stats.entities_upserted, 1);
988        assert_eq!(
989            result.stats.edges_inserted, 0,
990            "self-loop edge must be rejected (#2215)"
991        );
992    }
993}