Skip to main content

zeph_memory/semantic/
graph.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::sync::Arc;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use std::sync::atomic::Ordering;
9use zeph_db::DbPool;
10
11pub use zeph_common::config::memory::NoteLinkingConfig;
12use zeph_common::sanitize::strip_control_chars;
13use zeph_common::text::truncate_to_bytes_ref;
14use zeph_llm::any::AnyProvider;
15use zeph_llm::provider::LlmProvider as _;
16
17use crate::embedding_store::EmbeddingStore;
18use crate::error::MemoryError;
19use crate::graph::extractor::ExtractionResult as ExtractorResult;
20use crate::vector_store::VectorFilter;
21
22use super::SemanticMemory;
23
24/// Callback type for post-extraction validation.
25///
26/// A generic predicate opaque to zeph-memory — callers (zeph-core) provide security
27/// validation without introducing a dependency on security policy in this crate.
28pub type PostExtractValidator = Option<Box<dyn Fn(&ExtractorResult) -> Result<(), String> + Send>>;
29
30/// Config for the spawned background extraction task.
31///
32/// Owned clone of the relevant fields from `GraphConfig` — no references, safe to send to
33/// spawned tasks.
34#[derive(Debug, Clone)]
35pub struct GraphExtractionConfig {
36    pub max_entities: usize,
37    pub max_edges: usize,
38    pub extraction_timeout_secs: u64,
39    pub community_refresh_interval: usize,
40    pub expired_edge_retention_days: u32,
41    pub max_entities_cap: usize,
42    pub community_summary_max_prompt_bytes: usize,
43    pub community_summary_concurrency: usize,
44    pub lpa_edge_chunk_size: usize,
45    /// A-MEM note linking config, cloned from `GraphConfig.note_linking`.
46    pub note_linking: NoteLinkingConfig,
47    /// A-MEM link weight decay lambda. Range: `(0.0, 1.0]`. Default: `0.95`.
48    pub link_weight_decay_lambda: f64,
49    /// Seconds between link weight decay passes. Default: `86400`.
50    pub link_weight_decay_interval_secs: u64,
51    /// Kumiho belief revision: enable semantic contradiction detection for edges.
52    pub belief_revision_enabled: bool,
53    /// Cosine similarity threshold for belief revision contradiction detection.
54    pub belief_revision_similarity_threshold: f32,
55    /// GAAMA episode linking: `conversation_id` to link extracted entities to their episode.
56    /// `None` disables episode linking for this extraction pass.
57    pub conversation_id: Option<i64>,
58    /// APEX-MEM: use `insert_or_supersede` instead of `resolve_edge_typed`. Default: `false`.
59    pub apex_mem_enabled: bool,
60}
61
62impl Default for GraphExtractionConfig {
63    fn default() -> Self {
64        Self {
65            max_entities: 0,
66            max_edges: 0,
67            extraction_timeout_secs: 0,
68            community_refresh_interval: 0,
69            expired_edge_retention_days: 0,
70            max_entities_cap: 0,
71            community_summary_max_prompt_bytes: 0,
72            community_summary_concurrency: 0,
73            lpa_edge_chunk_size: 0,
74            note_linking: NoteLinkingConfig::default(),
75            link_weight_decay_lambda: 0.95,
76            link_weight_decay_interval_secs: 86400,
77            belief_revision_enabled: false,
78            belief_revision_similarity_threshold: 0.85,
79            conversation_id: None,
80            apex_mem_enabled: false,
81        }
82    }
83}
84
85/// Stats returned from a completed extraction.
86#[derive(Debug, Default)]
87pub struct ExtractionStats {
88    pub entities_upserted: usize,
89    pub edges_inserted: usize,
90}
91
92/// Result returned from `extract_and_store`, combining stats with entity IDs needed for linking.
93#[derive(Debug, Default)]
94pub struct ExtractionResult {
95    pub stats: ExtractionStats,
96    /// IDs of entities upserted during this extraction pass. Passed to `link_memory_notes`.
97    pub entity_ids: Vec<i64>,
98}
99
100/// Stats returned from a completed note-linking pass.
101#[derive(Debug, Default)]
102pub struct LinkingStats {
103    pub entities_processed: usize,
104    pub edges_created: usize,
105}
106
107/// Qdrant collection name for entity embeddings (mirrors the constant in `resolver.rs`).
108const ENTITY_COLLECTION: &str = "zeph_graph_entities";
109
110/// Mirrors the constant from `graph/resolver/mod.rs` — used for sanitizing APEX-MEM inputs.
111const MAX_RELATION_BYTES: usize = 256;
112/// Mirrors the constant from `graph/resolver/mod.rs` — used for sanitizing APEX-MEM inputs.
113const MAX_FACT_BYTES: usize = 2048;
114
115/// Work item for a single entity during a note-linking pass.
116struct EntityWorkItem {
117    entity_id: i64,
118    canonical_name: String,
119    embed_text: String,
120    self_point_id: Option<String>,
121}
122
123/// Link newly extracted entities to semantically similar entities in the graph.
124///
125/// For each entity in `entity_ids`:
126/// 1. Load the entity name + summary from `SQLite`.
127/// 2. Embed all entity texts in parallel.
128/// 3. Search the entity embedding collection in parallel for the `top_k + 1` most similar points.
129/// 4. Filter out the entity itself (by `qdrant_point_id` or `entity_id` payload) and points
130///    below `similarity_threshold`.
131/// 5. Insert a unidirectional `similar_to` edge where `source_id < target_id` to avoid
132///    double-counting in BFS recall while still being traversable via the OR clause in
133///    `edges_for_entity`. The edge confidence is set to the cosine similarity score.
134/// 6. Deduplicate pairs within a single pass so that a pair encountered from both A→B and B→A
135///    directions is only inserted once, keeping `edges_created` accurate.
136///
137/// Errors are logged and not propagated — this is a best-effort background enrichment step.
138pub async fn link_memory_notes(
139    entity_ids: &[i64],
140    pool: DbPool,
141    embedding_store: Arc<EmbeddingStore>,
142    provider: AnyProvider,
143    cfg: &NoteLinkingConfig,
144) -> LinkingStats {
145    use crate::graph::GraphStore;
146
147    let store = GraphStore::new(pool);
148    let mut stats = LinkingStats::default();
149
150    let work_items = collect_note_link_work_items(entity_ids, &store).await;
151    if work_items.is_empty() {
152        return stats;
153    }
154
155    let valid = embed_work_items(&work_items, &provider).await;
156
157    let search_limit = cfg.top_k + 1; // +1 to account for self-match
158    let search_results = search_similar_for_items(&valid, &embedding_store, search_limit).await;
159
160    insert_similarity_edges(
161        &work_items,
162        &valid,
163        &search_results,
164        cfg,
165        &store,
166        &mut stats,
167    )
168    .await;
169
170    stats
171}
172
173/// Phase 1: load entities from the DB and build work items for embedding.
174///
175/// Processes entities sequentially to avoid connection-pool contention.
176async fn collect_note_link_work_items(
177    entity_ids: &[i64],
178    store: &crate::graph::GraphStore,
179) -> Vec<EntityWorkItem> {
180    let mut work_items: Vec<EntityWorkItem> = Vec::with_capacity(entity_ids.len());
181    for &entity_id in entity_ids {
182        let entity = match store.find_entity_by_id(entity_id).await {
183            Ok(Some(e)) => e,
184            Ok(None) => {
185                tracing::debug!("note_linking: entity {entity_id} not found, skipping");
186                continue;
187            }
188            Err(e) => {
189                tracing::debug!("note_linking: DB error loading entity {entity_id}: {e:#}");
190                continue;
191            }
192        };
193        let embed_text = match &entity.summary {
194            Some(s) if !s.is_empty() => format!("{}: {s}", entity.canonical_name),
195            _ => entity.canonical_name.clone(),
196        };
197        work_items.push(EntityWorkItem {
198            entity_id,
199            canonical_name: entity.canonical_name,
200            embed_text,
201            self_point_id: entity.qdrant_point_id,
202        });
203    }
204    work_items
205}
206
207/// Phase 2: embed all entity texts in parallel.
208///
209/// Returns `(work_idx, embedding)` pairs for successfully embedded items.
210/// Items that fail to embed are logged and dropped.
211async fn embed_work_items(
212    work_items: &[EntityWorkItem],
213    provider: &AnyProvider,
214) -> Vec<(usize, Vec<f32>)> {
215    use futures::future;
216
217    let embed_results: Vec<_> =
218        future::join_all(work_items.iter().map(|w| provider.embed(&w.embed_text))).await;
219
220    embed_results
221        .into_iter()
222        .enumerate()
223        .filter_map(|(i, r)| match r {
224            Ok(v) => Some((i, v)),
225            Err(e) => {
226                tracing::debug!(
227                    "note_linking: embed failed for entity {:?}: {e:#}",
228                    work_items[i].canonical_name
229                );
230                None
231            }
232        })
233        .collect()
234}
235
236/// Phase 3: search the embedding store for similar entities for each embedded work item.
237async fn search_similar_for_items(
238    valid: &[(usize, Vec<f32>)],
239    embedding_store: &EmbeddingStore,
240    search_limit: usize,
241) -> Vec<Result<Vec<crate::ScoredVectorPoint>, MemoryError>> {
242    use futures::future;
243
244    future::join_all(valid.iter().map(|(_, vec)| {
245        embedding_store.search_collection(
246            ENTITY_COLLECTION,
247            vec,
248            search_limit,
249            None::<VectorFilter>,
250        )
251    }))
252    .await
253}
254
255/// Phase 4: insert similarity edges, deduplicating pairs seen from both A→B and B→A.
256///
257/// Without deduplication, both directions would call `insert_edge` for the same normalised
258/// pair and both return `Ok`, inflating `edges_created` by the number of bidirectional hits.
259async fn insert_similarity_edges(
260    work_items: &[EntityWorkItem],
261    valid: &[(usize, Vec<f32>)],
262    search_results: &[Result<Vec<crate::ScoredVectorPoint>, MemoryError>],
263    cfg: &NoteLinkingConfig,
264    store: &crate::graph::GraphStore,
265    stats: &mut LinkingStats,
266) {
267    let mut seen_pairs = std::collections::HashSet::new();
268
269    for ((work_idx, _), search_result) in valid.iter().zip(search_results.iter()) {
270        let w = &work_items[*work_idx];
271
272        let results = match search_result {
273            Ok(r) => r,
274            Err(e) => {
275                tracing::debug!(
276                    "note_linking: search failed for entity {:?}: {e:#}",
277                    w.canonical_name
278                );
279                continue;
280            }
281        };
282
283        stats.entities_processed += 1;
284
285        let self_point_id = w.self_point_id.as_deref();
286        let candidates = results
287            .iter()
288            .filter(|p| Some(p.id.as_str()) != self_point_id && p.score >= cfg.similarity_threshold)
289            .take(cfg.top_k);
290
291        for point in candidates {
292            let Some(target_id) = point
293                .payload
294                .get("entity_id")
295                .and_then(serde_json::Value::as_i64)
296            else {
297                tracing::debug!(
298                    "note_linking: missing entity_id in payload for point {}",
299                    point.id
300                );
301                continue;
302            };
303
304            if target_id == w.entity_id {
305                continue; // secondary self-guard when qdrant_point_id is null
306            }
307
308            // Normalise direction: always store source_id < target_id.
309            let (src, tgt) = if w.entity_id < target_id {
310                (w.entity_id, target_id)
311            } else {
312                (target_id, w.entity_id)
313            };
314
315            if !seen_pairs.insert((src, tgt)) {
316                continue;
317            }
318
319            let fact = format!("Semantically similar entities (score: {:.3})", point.score);
320
321            match store
322                .insert_edge(src, tgt, "similar_to", &fact, point.score, None)
323                .await
324            {
325                Ok(_) => stats.edges_created += 1,
326                Err(e) => {
327                    tracing::debug!("note_linking: insert_edge failed: {e:#}");
328                }
329            }
330        }
331    }
332}
333
334/// Extract entities and edges from `content` and persist them to the graph store.
335///
336/// This function runs inside a spawned task — it receives owned data only.
337///
338/// The optional `embedding_store` enables entity embedding storage in Qdrant, which is
339/// required for A-MEM note linking to find semantically similar entities across sessions.
340///
341/// # Errors
342///
343/// Returns an error if the database query fails or LLM extraction fails.
344#[cfg_attr(
345    feature = "profiling",
346    tracing::instrument(name = "memory.graph_extract", skip_all, fields(entities = tracing::field::Empty, edges = tracing::field::Empty))
347)]
348pub async fn extract_and_store(
349    content: String,
350    context_messages: Vec<String>,
351    provider: AnyProvider,
352    pool: DbPool,
353    config: GraphExtractionConfig,
354    post_extract_validator: PostExtractValidator,
355    embedding_store: Option<Arc<EmbeddingStore>>,
356) -> Result<ExtractionResult, MemoryError> {
357    use crate::graph::{EntityResolver, GraphExtractor, GraphStore};
358
359    let extractor = GraphExtractor::new(provider.clone(), config.max_entities, config.max_edges);
360    let ctx_refs: Vec<&str> = context_messages.iter().map(String::as_str).collect();
361
362    let store = GraphStore::new(pool);
363
364    bump_extraction_count(store.pool()).await?;
365
366    let Some(result) = extractor.extract(&content, &ctx_refs).await? else {
367        return Ok(ExtractionResult::default());
368    };
369
370    // Post-extraction validation callback. zeph-memory does not know the callback is a
371    // security validator — it is a generic predicate opaque to this crate (design decision D1).
372    if let Some(ref validator) = post_extract_validator
373        && let Err(reason) = validator(&result)
374    {
375        tracing::warn!(
376            reason,
377            "graph extraction validation failed, skipping upsert"
378        );
379        return Ok(ExtractionResult::default());
380    }
381
382    let resolver = if let Some(ref emb) = embedding_store {
383        EntityResolver::new(&store)
384            .with_embedding_store(emb)
385            .with_provider(&provider)
386    } else {
387        EntityResolver::new(&store)
388    };
389
390    let (entity_name_to_id, entities_upserted) = upsert_entities(&resolver, &result.entities).await;
391    let edges_inserted = insert_edges(&resolver, &result.edges, &entity_name_to_id, &config).await;
392
393    store.checkpoint_wal().await?;
394
395    let new_entity_ids: Vec<i64> = entity_name_to_id.into_values().collect();
396
397    link_episode(&store, &config, &new_entity_ids).await;
398
399    #[cfg(feature = "profiling")]
400    {
401        let span = tracing::Span::current();
402        span.record("entities", entities_upserted);
403        span.record("edges", edges_inserted);
404    }
405
406    Ok(ExtractionResult {
407        stats: ExtractionStats {
408            entities_upserted,
409            edges_inserted,
410        },
411        entity_ids: new_entity_ids,
412    })
413}
414
415/// Increment the extraction counter in `graph_metadata`.
416async fn bump_extraction_count(pool: &DbPool) -> Result<(), MemoryError> {
417    zeph_db::query(sql!(
418        "INSERT INTO graph_metadata (key, value) VALUES ('extraction_count', '0')
419         ON CONFLICT(key) DO NOTHING"
420    ))
421    .execute(pool)
422    .await?;
423    zeph_db::query(sql!(
424        "UPDATE graph_metadata
425         SET value = CAST(CAST(value AS INTEGER) + 1 AS TEXT)
426         WHERE key = 'extraction_count'"
427    ))
428    .execute(pool)
429    .await?;
430    Ok(())
431}
432
433/// Upsert all extracted entities and return the name-to-id map and upsert count.
434async fn upsert_entities(
435    resolver: &crate::graph::EntityResolver<'_>,
436    entities: &[crate::graph::extractor::ExtractedEntity],
437) -> (std::collections::HashMap<String, i64>, usize) {
438    let mut entity_name_to_id: std::collections::HashMap<String, i64> =
439        std::collections::HashMap::new();
440    let mut entities_upserted = 0usize;
441
442    for entity in entities {
443        match resolver
444            .resolve(&entity.name, &entity.entity_type, entity.summary.as_deref())
445            .await
446        {
447            Ok((id, _outcome)) => {
448                entity_name_to_id.insert(entity.name.clone(), id);
449                entities_upserted += 1;
450            }
451            Err(e) => {
452                tracing::debug!("graph: skipping entity {:?}: {e:#}", entity.name);
453            }
454        }
455    }
456
457    (entity_name_to_id, entities_upserted)
458}
459
460/// Insert extracted edges that have both endpoints in `name_to_id`.
461///
462/// Returns the number of edges actually inserted.
463async fn insert_edges(
464    resolver: &crate::graph::EntityResolver<'_>,
465    edges: &[crate::graph::extractor::ExtractedEdge],
466    name_to_id: &std::collections::HashMap<String, i64>,
467    config: &GraphExtractionConfig,
468) -> usize {
469    let mut edges_inserted = 0usize;
470    for edge in edges {
471        let (Some(&src_id), Some(&tgt_id)) =
472            (name_to_id.get(&edge.source), name_to_id.get(&edge.target))
473        else {
474            tracing::debug!(
475                "graph: skipping edge {:?}->{:?}: entity not resolved",
476                edge.source,
477                edge.target
478            );
479            continue;
480        };
481        if src_id == tgt_id {
482            tracing::debug!(
483                "graph: skipping self-loop edge {:?}->{:?} (entity_id={src_id})",
484                edge.source,
485                edge.target
486            );
487            continue;
488        }
489        // Parse LLM-provided edge_type; default to Semantic on any parse failure so
490        // edges are never dropped due to classification errors.
491        let edge_type = edge
492            .edge_type
493            .parse::<crate::graph::EdgeType>()
494            .unwrap_or_else(|_| {
495                tracing::warn!(
496                    raw_type = %edge.edge_type,
497                    "graph: unknown edge_type from LLM, defaulting to semantic"
498                );
499                crate::graph::EdgeType::Semantic
500            });
501        if config.apex_mem_enabled {
502            // APEX-MEM: append-only write path with supersession chains.
503            let relation_trimmed = edge.relation.trim();
504            let relation_display_clean = strip_control_chars(relation_trimmed);
505            let relation_display =
506                truncate_to_bytes_ref(&relation_display_clean, MAX_RELATION_BYTES).to_owned();
507            let canonical_clean = strip_control_chars(&relation_trimmed.to_lowercase());
508            let canonical_relation =
509                truncate_to_bytes_ref(&canonical_clean, MAX_RELATION_BYTES).to_owned();
510            let fact_clean = strip_control_chars(edge.fact.trim());
511            let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
512            match resolver
513                .graph_store()
514                .insert_or_supersede(
515                    src_id,
516                    tgt_id,
517                    &relation_display,
518                    &canonical_relation,
519                    &normalized_fact,
520                    0.8,
521                    None,
522                    edge_type,
523                    true,
524                )
525                .await
526            {
527                Ok(_) => edges_inserted += 1,
528                Err(e) => {
529                    tracing::debug!("graph: skipping edge (apex): {e:#}");
530                }
531            }
532        } else {
533            let belief_cfg =
534                config
535                    .belief_revision_enabled
536                    .then_some(crate::graph::BeliefRevisionConfig {
537                        similarity_threshold: config.belief_revision_similarity_threshold,
538                    });
539            match resolver
540                .resolve_edge_typed(
541                    src_id,
542                    tgt_id,
543                    &edge.relation,
544                    &edge.fact,
545                    0.8,
546                    None,
547                    edge_type,
548                    belief_cfg.as_ref(),
549                )
550                .await
551            {
552                Ok(Some(_)) => edges_inserted += 1,
553                Ok(None) => {} // deduplicated
554                Err(e) => {
555                    tracing::debug!("graph: skipping edge: {e:#}");
556                }
557            }
558        }
559    }
560    edges_inserted
561}
562
563/// Link extracted entities to their GAAMA episode when a conversation ID is configured.
564async fn link_episode(
565    store: &crate::graph::GraphStore,
566    config: &GraphExtractionConfig,
567    entity_ids: &[i64],
568) {
569    let Some(conv_id) = config.conversation_id else {
570        return;
571    };
572    match store.ensure_episode(conv_id).await {
573        Ok(episode_id) => {
574            for &entity_id in entity_ids {
575                if let Err(e) = store.link_entity_to_episode(episode_id, entity_id).await {
576                    tracing::debug!("episode linking skipped for entity {entity_id}: {e:#}");
577                }
578            }
579        }
580        Err(e) => {
581            tracing::warn!("failed to ensure episode for conversation {conv_id}: {e:#}");
582        }
583    }
584}
585
586impl SemanticMemory {
587    /// Spawn background graph extraction for a message. Fire-and-forget — never blocks.
588    ///
589    /// Extraction runs in a separate tokio task with a timeout. Any error or timeout is
590    /// logged and the task exits silently; the agent response is never blocked.
591    ///
592    /// The optional `post_extract_validator` is called after extraction, before upsert.
593    /// It is a generic predicate opaque to zeph-memory (design decision D1).
594    ///
595    /// When `config.note_linking.enabled` is `true` and an embedding store is available,
596    /// `link_memory_notes` runs after successful extraction inside the same task, bounded
597    /// by `config.note_linking.timeout_secs`.
598    pub fn spawn_graph_extraction(
599        &self,
600        content: String,
601        context_messages: Vec<String>,
602        config: GraphExtractionConfig,
603        post_extract_validator: PostExtractValidator,
604        provider_override: Option<AnyProvider>,
605    ) -> tokio::task::JoinHandle<()> {
606        let using_override = provider_override.is_some();
607        let provider = provider_override.unwrap_or_else(|| self.provider.clone());
608        if using_override {
609            tracing::debug!(
610                extract_provider = provider.name(),
611                "graph extraction using override provider (quality_gate bypassed)"
612            );
613        }
614        let ctx = GraphExtractionTaskCtx {
615            pool: self.sqlite.pool().clone(),
616            provider,
617            failure_counter: self.community_detection_failures.clone(),
618            extraction_count: self.graph_extraction_count.clone(),
619            extraction_failures: self.graph_extraction_failures.clone(),
620            embedding_store: self.qdrant.clone(),
621        };
622
623        tokio::spawn(run_graph_extraction_task(
624            content,
625            context_messages,
626            config,
627            post_extract_validator,
628            ctx,
629        ))
630    }
631}
632
633/// Owned context bundled for the spawned extraction task.
634///
635/// Bundles the Arcs that must be cloned before entering `tokio::spawn`.
636struct GraphExtractionTaskCtx {
637    pool: DbPool,
638    provider: AnyProvider,
639    failure_counter: Arc<std::sync::atomic::AtomicU64>,
640    extraction_count: Arc<std::sync::atomic::AtomicU64>,
641    extraction_failures: Arc<std::sync::atomic::AtomicU64>,
642    embedding_store: Option<Arc<EmbeddingStore>>,
643}
644
645/// Body of the spawned graph-extraction task.
646async fn run_graph_extraction_task(
647    content: String,
648    context_messages: Vec<String>,
649    config: GraphExtractionConfig,
650    post_extract_validator: PostExtractValidator,
651    ctx: GraphExtractionTaskCtx,
652) {
653    let timeout_dur = std::time::Duration::from_secs(config.extraction_timeout_secs);
654    let extraction_result = tokio::time::timeout(
655        timeout_dur,
656        extract_and_store(
657            content,
658            context_messages,
659            ctx.provider.clone(),
660            ctx.pool.clone(),
661            config.clone(),
662            post_extract_validator,
663            ctx.embedding_store.clone(),
664        ),
665    )
666    .await;
667
668    let (extraction_ok, new_entity_ids) = match extraction_result {
669        Ok(Ok(result)) => {
670            tracing::debug!(
671                entities = result.stats.entities_upserted,
672                edges = result.stats.edges_inserted,
673                "graph extraction completed"
674            );
675            ctx.extraction_count.fetch_add(1, Ordering::Relaxed);
676            (true, result.entity_ids)
677        }
678        Ok(Err(e)) => {
679            tracing::warn!("graph extraction failed: {e:#}");
680            ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
681            (false, vec![])
682        }
683        Err(_elapsed) => {
684            tracing::warn!("graph extraction timed out");
685            ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
686            (false, vec![])
687        }
688    };
689
690    run_note_linking(
691        extraction_ok,
692        &new_entity_ids,
693        ctx.pool.clone(),
694        ctx.embedding_store,
695        ctx.provider.clone(),
696        &config,
697    )
698    .await;
699
700    maybe_refresh_communities(
701        extraction_ok,
702        ctx.pool,
703        ctx.provider,
704        ctx.failure_counter,
705        &config,
706    )
707    .await;
708}
709
710/// Run A-MEM note linking after successful extraction when enabled.
711async fn run_note_linking(
712    extraction_ok: bool,
713    new_entity_ids: &[i64],
714    pool: DbPool,
715    embedding_store: Option<Arc<EmbeddingStore>>,
716    provider: AnyProvider,
717    config: &GraphExtractionConfig,
718) {
719    if !extraction_ok || !config.note_linking.enabled || new_entity_ids.is_empty() {
720        return;
721    }
722    let Some(store) = embedding_store else {
723        return;
724    };
725    let linking_timeout = std::time::Duration::from_secs(config.note_linking.timeout_secs);
726    match tokio::time::timeout(
727        linking_timeout,
728        link_memory_notes(new_entity_ids, pool, store, provider, &config.note_linking),
729    )
730    .await
731    {
732        Ok(stats) => {
733            tracing::debug!(
734                entities_processed = stats.entities_processed,
735                edges_created = stats.edges_created,
736                "note linking completed"
737            );
738        }
739        Err(_elapsed) => {
740            tracing::debug!("note linking timed out (partial edges may exist)");
741        }
742    }
743}
744
745/// Trigger community detection, graph eviction, and link-weight decay when the extraction
746/// count hits the configured refresh interval.
747async fn maybe_refresh_communities(
748    extraction_ok: bool,
749    pool: DbPool,
750    provider: AnyProvider,
751    failure_counter: Arc<std::sync::atomic::AtomicU64>,
752    config: &GraphExtractionConfig,
753) {
754    use crate::graph::GraphStore;
755
756    if !extraction_ok || config.community_refresh_interval == 0 {
757        return;
758    }
759
760    let store = GraphStore::new(pool.clone());
761    let extraction_count = store.extraction_count().await.unwrap_or(0);
762    if extraction_count == 0
763        || !i64::try_from(config.community_refresh_interval)
764            .is_ok_and(|interval| extraction_count % interval == 0)
765    {
766        return;
767    }
768
769    tracing::info!(extraction_count, "triggering community detection refresh");
770    let store2 = GraphStore::new(pool);
771    let provider2 = provider;
772    let retention_days = config.expired_edge_retention_days;
773    let max_cap = config.max_entities_cap;
774    let max_prompt_bytes = config.community_summary_max_prompt_bytes;
775    let concurrency = config.community_summary_concurrency;
776    let edge_chunk_size = config.lpa_edge_chunk_size;
777    let decay_lambda = config.link_weight_decay_lambda;
778    let decay_interval_secs = config.link_weight_decay_interval_secs;
779    tokio::spawn(async move {
780        match crate::graph::community::detect_communities(
781            &store2,
782            &provider2,
783            max_prompt_bytes,
784            concurrency,
785            edge_chunk_size,
786        )
787        .await
788        {
789            Ok(count) => {
790                tracing::info!(communities = count, "community detection complete");
791            }
792            Err(e) => {
793                tracing::warn!("community detection failed: {e:#}");
794                failure_counter.fetch_add(1, Ordering::Relaxed);
795            }
796        }
797        match crate::graph::community::run_graph_eviction(&store2, retention_days, max_cap).await {
798            Ok(stats) => {
799                tracing::info!(
800                    expired_edges = stats.expired_edges_deleted,
801                    orphan_entities = stats.orphan_entities_deleted,
802                    capped_entities = stats.capped_entities_deleted,
803                    "graph eviction complete"
804                );
805            }
806            Err(e) => {
807                tracing::warn!("graph eviction failed: {e:#}");
808            }
809        }
810
811        // Time-based link weight decay — independent of eviction cycle.
812        if decay_lambda > 0.0 && decay_interval_secs > 0 {
813            let now_secs = std::time::SystemTime::now()
814                .duration_since(std::time::UNIX_EPOCH)
815                .map_or(0, |d| d.as_secs());
816            let last_decay = store2
817                .get_metadata("last_link_weight_decay_at")
818                .await
819                .ok()
820                .flatten()
821                .and_then(|s| s.parse::<u64>().ok())
822                .unwrap_or(0);
823            if now_secs.saturating_sub(last_decay) >= decay_interval_secs {
824                match store2
825                    .decay_edge_retrieval_counts(decay_lambda, decay_interval_secs)
826                    .await
827                {
828                    Ok(affected) => {
829                        tracing::info!(affected, "link weight decay applied");
830                        let _ = store2
831                            .set_metadata("last_link_weight_decay_at", &now_secs.to_string())
832                            .await;
833                    }
834                    Err(e) => {
835                        tracing::warn!("link weight decay failed: {e:#}");
836                    }
837                }
838            }
839        }
840    });
841}
842
843#[cfg(test)]
844mod tests {
845    use std::sync::Arc;
846
847    use zeph_llm::any::AnyProvider;
848
849    use super::extract_and_store;
850    use crate::embedding_store::EmbeddingStore;
851    use crate::graph::GraphStore;
852    use crate::in_memory_store::InMemoryVectorStore;
853    use crate::store::SqliteStore;
854
855    use super::GraphExtractionConfig;
856
857    async fn setup() -> (GraphStore, Arc<EmbeddingStore>) {
858        let sqlite = SqliteStore::new(":memory:").await.unwrap();
859        let pool = sqlite.pool().clone();
860        let mem_store = Box::new(InMemoryVectorStore::new());
861        let emb = Arc::new(EmbeddingStore::with_store(mem_store, pool.clone()));
862        let gs = GraphStore::new(pool);
863        (gs, emb)
864    }
865
866    /// Regression test for #1829: `extract_and_store()` must pass the provider to `EntityResolver`
867    /// so that `store_entity_embedding()` is called and `qdrant_point_id` is set in `SQLite`.
868    #[tokio::test]
869    async fn extract_and_store_sets_qdrant_point_id_when_embedding_store_provided() {
870        let (gs, emb) = setup().await;
871
872        // MockProvider: supports embeddings, returns a valid extraction JSON for chat
873        let extraction_json = r#"{"entities":[{"name":"Rust","type":"language","summary":"systems language"}],"edges":[]}"#;
874        let mut mock =
875            zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
876        mock.supports_embeddings = true;
877        mock.embedding = vec![1.0_f32, 0.0, 0.0, 0.0];
878        let provider = AnyProvider::Mock(mock);
879
880        let config = GraphExtractionConfig {
881            max_entities: 10,
882            max_edges: 10,
883            extraction_timeout_secs: 10,
884            ..Default::default()
885        };
886
887        let result = extract_and_store(
888            "Rust is a systems programming language.".to_owned(),
889            vec![],
890            provider,
891            gs.pool().clone(),
892            config,
893            None,
894            Some(emb.clone()),
895        )
896        .await
897        .unwrap();
898
899        assert_eq!(
900            result.stats.entities_upserted, 1,
901            "one entity should be upserted"
902        );
903
904        // The entity must have a qdrant_point_id — this proves store_entity_embedding() was called.
905        // Before the fix, EntityResolver was built without a provider, so embed() was never called
906        // and qdrant_point_id remained NULL.
907        let entity = gs
908            .find_entity("rust", crate::graph::EntityType::Language)
909            .await
910            .unwrap()
911            .expect("entity 'rust' must exist in SQLite");
912
913        assert!(
914            entity.qdrant_point_id.is_some(),
915            "qdrant_point_id must be set when embedding_store + provider are both provided (regression for #1829)"
916        );
917    }
918
919    /// When no `embedding_store` is provided, `extract_and_store()` must still work correctly
920    /// (no embeddings stored, but entities are still upserted).
921    #[tokio::test]
922    async fn extract_and_store_without_embedding_store_still_upserts_entities() {
923        let (gs, _emb) = setup().await;
924
925        let extraction_json = r#"{"entities":[{"name":"Python","type":"language","summary":"scripting"}],"edges":[]}"#;
926        let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
927        let provider = AnyProvider::Mock(mock);
928
929        let config = GraphExtractionConfig {
930            max_entities: 10,
931            max_edges: 10,
932            extraction_timeout_secs: 10,
933            ..Default::default()
934        };
935
936        let result = extract_and_store(
937            "Python is a scripting language.".to_owned(),
938            vec![],
939            provider,
940            gs.pool().clone(),
941            config,
942            None,
943            None, // no embedding_store
944        )
945        .await
946        .unwrap();
947
948        assert_eq!(result.stats.entities_upserted, 1);
949
950        let entity = gs
951            .find_entity("python", crate::graph::EntityType::Language)
952            .await
953            .unwrap()
954            .expect("entity 'python' must exist");
955
956        assert!(
957            entity.qdrant_point_id.is_none(),
958            "qdrant_point_id must remain None when no embedding_store is provided"
959        );
960    }
961
962    /// Regression test for #2166: FTS5 entity writes must be visible to a new connection pool
963    /// opened after extraction completes. Without `checkpoint_wal()` in `extract_and_store`,
964    /// a fresh pool sees stale FTS5 shadow tables and `find_entities_fuzzy` returns empty.
965    #[tokio::test]
966    async fn extract_and_store_fts5_cross_session_visibility() {
967        let file = tempfile::NamedTempFile::new().expect("tempfile");
968        let path = file.path().to_str().expect("valid path").to_string();
969
970        // Session A: run extract_and_store on a file DB (not :memory:) so WAL is used.
971        {
972            let sqlite = crate::store::SqliteStore::new(&path).await.unwrap();
973            let extraction_json = r#"{"entities":[{"name":"Ferris","type":"concept","summary":"Rust mascot"}],"edges":[]}"#;
974            let mock =
975                zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
976            let provider = AnyProvider::Mock(mock);
977            let config = GraphExtractionConfig {
978                max_entities: 10,
979                max_edges: 10,
980                extraction_timeout_secs: 10,
981                ..Default::default()
982            };
983            extract_and_store(
984                "Ferris is the Rust mascot.".to_owned(),
985                vec![],
986                provider,
987                sqlite.pool().clone(),
988                config,
989                None,
990                None,
991            )
992            .await
993            .unwrap();
994        }
995
996        // Session B: new pool — FTS5 must see the entity extracted in session A.
997        let sqlite_b = crate::store::SqliteStore::new(&path).await.unwrap();
998        let gs_b = crate::graph::GraphStore::new(sqlite_b.pool().clone());
999        let results = gs_b.find_entities_fuzzy("Ferris", 10).await.unwrap();
1000        assert!(
1001            !results.is_empty(),
1002            "FTS5 cross-session (#2166): entity extracted in session A must be visible in session B"
1003        );
1004    }
1005
1006    /// Regression test for #2215: self-loop edges (source == target entity) must be silently
1007    /// skipped; no edge row should be inserted.
1008    #[tokio::test]
1009    async fn extract_and_store_skips_self_loop_edges() {
1010        let (gs, _emb) = setup().await;
1011
1012        // LLM returns one entity and one self-loop edge (source == target).
1013        let extraction_json = r#"{
1014            "entities":[{"name":"Rust","type":"language","summary":"systems language"}],
1015            "edges":[{"source":"Rust","target":"Rust","relation":"is","fact":"Rust is Rust","edge_type":"semantic"}]
1016        }"#;
1017        let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1018        let provider = AnyProvider::Mock(mock);
1019
1020        let config = GraphExtractionConfig {
1021            max_entities: 10,
1022            max_edges: 10,
1023            extraction_timeout_secs: 10,
1024            ..Default::default()
1025        };
1026
1027        let result = extract_and_store(
1028            "Rust is a language.".to_owned(),
1029            vec![],
1030            provider,
1031            gs.pool().clone(),
1032            config,
1033            None,
1034            None,
1035        )
1036        .await
1037        .unwrap();
1038
1039        assert_eq!(result.stats.entities_upserted, 1);
1040        assert_eq!(
1041            result.stats.edges_inserted, 0,
1042            "self-loop edge must be rejected (#2215)"
1043        );
1044    }
1045
1046    /// When `apex_mem_enabled = true`, edges must be inserted via `insert_or_supersede`
1047    /// (the APEX-MEM append-only path) instead of the legacy `resolve_edge_typed` path.
1048    /// Verifies that edges are still counted as inserted and that the supersession row
1049    /// is created in the database.
1050    #[tokio::test]
1051    async fn apex_mem_path_inserts_edge_via_insert_or_supersede() {
1052        let (gs, _emb) = setup().await;
1053
1054        let extraction_json = r#"{
1055            "entities":[
1056                {"name":"Alice","type":"person","summary":"a person"},
1057                {"name":"Bob","type":"person","summary":"another person"}
1058            ],
1059            "edges":[
1060                {"source":"Alice","target":"Bob","relation":"KNOWS","fact":"Alice knows Bob","edge_type":"semantic"}
1061            ]
1062        }"#;
1063        let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1064        let provider = AnyProvider::Mock(mock);
1065
1066        let config = GraphExtractionConfig {
1067            max_entities: 10,
1068            max_edges: 10,
1069            extraction_timeout_secs: 10,
1070            apex_mem_enabled: true,
1071            ..Default::default()
1072        };
1073
1074        let result = extract_and_store(
1075            "Alice knows Bob.".to_owned(),
1076            vec![],
1077            provider,
1078            gs.pool().clone(),
1079            config,
1080            None,
1081            None,
1082        )
1083        .await
1084        .unwrap();
1085
1086        assert_eq!(result.stats.entities_upserted, 2, "two entities expected");
1087        assert_eq!(
1088            result.stats.edges_inserted, 1,
1089            "APEX-MEM path must insert the edge and count it (#3631)"
1090        );
1091
1092        // Verify the edge row exists and its relation preserves display casing.
1093        let alice_id = gs
1094            .find_entity("alice", crate::graph::EntityType::Person)
1095            .await
1096            .unwrap()
1097            .expect("entity 'alice' must exist")
1098            .id;
1099        let bob_id = gs
1100            .find_entity("bob", crate::graph::EntityType::Person)
1101            .await
1102            .unwrap()
1103            .expect("entity 'bob' must exist")
1104            .id;
1105        let edges = gs.edges_exact(alice_id, bob_id).await.unwrap();
1106        assert_eq!(edges.len(), 1, "exactly one edge expected");
1107        // canonical_relation is lowercased; relation field preserves original casing post-strip
1108        assert_eq!(
1109            edges[0].relation, "KNOWS",
1110            "display relation must preserve original casing"
1111        );
1112    }
1113}