Skip to main content

zeph_memory/semantic/
graph.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::sync::Arc;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use std::sync::atomic::Ordering;
9use tokio_util::sync::CancellationToken;
10use zeph_db::DbPool;
11
12pub use zeph_common::config::memory::NoteLinkingConfig;
13use zeph_common::sanitize::strip_control_chars;
14use zeph_common::text::truncate_to_bytes_ref;
15use zeph_llm::any::AnyProvider;
16use zeph_llm::provider::LlmProvider as _;
17
18use crate::embedding_store::EmbeddingStore;
19use crate::error::MemoryError;
20use crate::graph::extractor::ExtractionResult as ExtractorResult;
21use crate::vector_store::VectorFilter;
22
23use super::SemanticMemory;
24
25/// Callback type for post-extraction validation.
26///
27/// A generic predicate opaque to zeph-memory — callers (zeph-core) provide security
28/// validation without introducing a dependency on security policy in this crate.
29pub type PostExtractValidator = Option<Box<dyn Fn(&ExtractorResult) -> Result<(), String> + Send>>;
30
31/// Config for the spawned background extraction task.
32///
33/// Owned clone of the relevant fields from `GraphConfig` — no references, safe to send to
34/// spawned tasks.
35#[derive(Debug, Clone)]
36pub struct GraphExtractionConfig {
37    pub max_entities: usize,
38    pub max_edges: usize,
39    pub extraction_timeout_secs: u64,
40    pub community_refresh_interval: usize,
41    pub expired_edge_retention_days: u32,
42    pub max_entities_cap: usize,
43    pub community_summary_max_prompt_bytes: usize,
44    pub community_summary_concurrency: usize,
45    pub lpa_edge_chunk_size: usize,
46    /// A-MEM note linking config, cloned from `GraphConfig.note_linking`.
47    pub note_linking: NoteLinkingConfig,
48    /// A-MEM link weight decay lambda. Range: `(0.0, 1.0]`. Default: `0.95`.
49    pub link_weight_decay_lambda: f64,
50    /// Seconds between link weight decay passes. Default: `86400`.
51    pub link_weight_decay_interval_secs: u64,
52    /// Kumiho belief revision: enable semantic contradiction detection for edges.
53    pub belief_revision_enabled: bool,
54    /// Cosine similarity threshold for belief revision contradiction detection.
55    pub belief_revision_similarity_threshold: f32,
56    /// GAAMA episode linking: `conversation_id` to link extracted entities to their episode.
57    /// `None` disables episode linking for this extraction pass.
58    pub conversation_id: Option<i64>,
59    /// APEX-MEM: use `insert_or_supersede` instead of `resolve_edge_typed`. Default: `false`.
60    pub apex_mem_enabled: bool,
61    /// LLM call timeout for extraction, in seconds. Default: `30`.
62    pub llm_timeout_secs: u64,
63    /// Per-call timeout for every `embed()` invocation, in seconds. Default: `5`.
64    pub embed_timeout_secs: u64,
65    /// Turn index within the episode for edges inserted during this extraction pass (#3710).
66    ///
67    /// `None` disables turn-level provenance recording for this pass.
68    pub turn_index: Option<u32>,
69    /// `MemORAI` write-gate prefilter: minimum confidence for low-signal-relation edges (#3709).
70    ///
71    /// `None` disables the gate (default behaviour, always writes).
72    pub write_gate_min_relevance: Option<f32>,
73    /// Benna-Fusi fast-variable learning rate for confidence merges (#4711).
74    ///
75    /// Passed to [`crate::graph::GraphStore::with_benna_rates`]. Default: `0.5`.
76    pub benna_fast_rate: f32,
77    /// Benna-Fusi slow-variable learning rate for confidence merges (#4711).
78    ///
79    /// Passed to [`crate::graph::GraphStore::with_benna_rates`]. Default: `0.05`.
80    pub benna_slow_rate: f32,
81}
82
83impl Default for GraphExtractionConfig {
84    fn default() -> Self {
85        Self {
86            max_entities: 0,
87            max_edges: 0,
88            extraction_timeout_secs: 0,
89            community_refresh_interval: 0,
90            expired_edge_retention_days: 0,
91            max_entities_cap: 0,
92            community_summary_max_prompt_bytes: 0,
93            community_summary_concurrency: 0,
94            lpa_edge_chunk_size: 0,
95            note_linking: NoteLinkingConfig::default(),
96            link_weight_decay_lambda: 0.95,
97            link_weight_decay_interval_secs: 86400,
98            belief_revision_enabled: false,
99            belief_revision_similarity_threshold: 0.85,
100            conversation_id: None,
101            apex_mem_enabled: false,
102            llm_timeout_secs: 30,
103            embed_timeout_secs: 5,
104            turn_index: None,
105            write_gate_min_relevance: None,
106            benna_fast_rate: 0.5,
107            benna_slow_rate: 0.05,
108        }
109    }
110}
111
112/// Stats returned from a completed extraction.
113#[derive(Debug, Default)]
114pub struct ExtractionStats {
115    pub entities_upserted: usize,
116    pub edges_inserted: usize,
117}
118
119/// Result returned from `extract_and_store`, combining stats with entity IDs needed for linking.
120#[derive(Debug, Default)]
121pub struct ExtractionResult {
122    pub stats: ExtractionStats,
123    /// IDs of entities upserted during this extraction pass. Passed to `link_memory_notes`.
124    pub entity_ids: Vec<i64>,
125}
126
127/// Stats returned from a completed note-linking pass.
128#[derive(Debug, Default)]
129pub struct LinkingStats {
130    pub entities_processed: usize,
131    pub edges_created: usize,
132}
133
134/// Qdrant collection name for entity embeddings (mirrors the constant in `resolver.rs`).
135const ENTITY_COLLECTION: &str = "zeph_graph_entities";
136
137/// Mirrors the constant from `graph/resolver/mod.rs` — used for sanitizing APEX-MEM inputs.
138const MAX_RELATION_BYTES: usize = 256;
139/// Mirrors the constant from `graph/resolver/mod.rs` — used for sanitizing APEX-MEM inputs.
140const MAX_FACT_BYTES: usize = 2048;
141/// Fallback confidence used when the LLM omits the `confidence` field in an extracted edge.
142const DEFAULT_EDGE_CONFIDENCE: f32 = 0.8;
143
144/// Work item for a single entity during a note-linking pass.
145struct EntityWorkItem {
146    entity_id: i64,
147    canonical_name: String,
148    embed_text: String,
149    self_point_id: Option<String>,
150}
151
152/// Link newly extracted entities to semantically similar entities in the graph.
153///
154/// For each entity in `entity_ids`:
155/// 1. Load the entity name + summary from `SQLite`.
156/// 2. Embed all entity texts in parallel.
157/// 3. Search the entity embedding collection in parallel for the `top_k + 1` most similar points.
158/// 4. Filter out the entity itself (by `qdrant_point_id` or `entity_id` payload) and points
159///    below `similarity_threshold`.
160/// 5. Insert a unidirectional `similar_to` edge where `source_id < target_id` to avoid
161///    double-counting in BFS recall while still being traversable via the OR clause in
162///    `edges_for_entity`. The edge confidence is set to the cosine similarity score.
163/// 6. Deduplicate pairs within a single pass so that a pair encountered from both A→B and B→A
164///    directions is only inserted once, keeping `edges_created` accurate.
165///
166/// Errors are logged and not propagated — this is a best-effort background enrichment step.
167pub async fn link_memory_notes(
168    entity_ids: &[i64],
169    pool: DbPool,
170    embedding_store: Arc<EmbeddingStore>,
171    provider: AnyProvider,
172    cfg: &NoteLinkingConfig,
173) -> LinkingStats {
174    use crate::graph::GraphStore;
175
176    let store = GraphStore::new(pool);
177    let mut stats = LinkingStats::default();
178
179    let work_items = collect_note_link_work_items(entity_ids, &store).await;
180    if work_items.is_empty() {
181        return stats;
182    }
183
184    let valid = embed_work_items(&work_items, &provider, cfg).await;
185
186    let search_limit = cfg.top_k + 1; // +1 to account for self-match
187    let search_results = search_similar_for_items(&valid, &embedding_store, search_limit).await;
188
189    insert_similarity_edges(
190        &work_items,
191        &valid,
192        &search_results,
193        cfg,
194        &store,
195        &mut stats,
196    )
197    .await;
198
199    stats
200}
201
202/// Phase 1: load entities from the DB and build work items for embedding.
203///
204/// Processes entities sequentially to avoid connection-pool contention.
205async fn collect_note_link_work_items(
206    entity_ids: &[i64],
207    store: &crate::graph::GraphStore,
208) -> Vec<EntityWorkItem> {
209    let mut work_items: Vec<EntityWorkItem> = Vec::with_capacity(entity_ids.len());
210    for &entity_id in entity_ids {
211        let entity = match store.find_entity_by_id(entity_id).await {
212            Ok(Some(e)) => e,
213            Ok(None) => {
214                tracing::debug!("note_linking: entity {entity_id} not found, skipping");
215                continue;
216            }
217            Err(e) => {
218                tracing::debug!("note_linking: DB error loading entity {entity_id}: {e:#}");
219                continue;
220            }
221        };
222        let embed_text = match &entity.summary {
223            Some(s) if !s.is_empty() => format!("{}: {s}", entity.canonical_name),
224            _ => entity.canonical_name.clone(),
225        };
226        work_items.push(EntityWorkItem {
227            entity_id,
228            canonical_name: entity.canonical_name,
229            embed_text,
230            self_point_id: entity.qdrant_point_id,
231        });
232    }
233    work_items
234}
235
236/// Phase 2: embed all entity texts in parallel.
237///
238/// Returns `(work_idx, embedding)` pairs for successfully embedded items.
239/// Items that fail to embed are logged and dropped.
240async fn embed_work_items(
241    work_items: &[EntityWorkItem],
242    provider: &AnyProvider,
243    cfg: &NoteLinkingConfig,
244) -> Vec<(usize, Vec<f32>)> {
245    use futures::future;
246
247    let Ok(embed_results) = tokio::time::timeout(
248        std::time::Duration::from_secs(cfg.timeout_secs),
249        future::join_all(work_items.iter().map(|w| provider.embed(&w.embed_text))),
250    )
251    .await
252    else {
253        tracing::warn!(
254            count = work_items.len(),
255            "note_linking: batch embed timed out — skipping all entities"
256        );
257        return Vec::new();
258    };
259    embed_results
260        .into_iter()
261        .enumerate()
262        .filter_map(|(i, r)| match r {
263            Ok(v) => Some((i, v)),
264            Err(e) => {
265                tracing::debug!(
266                    "note_linking: embed failed for entity {:?}: {e:#}",
267                    work_items[i].canonical_name
268                );
269                None
270            }
271        })
272        .collect()
273}
274
275/// Phase 3: search the embedding store for similar entities for each embedded work item.
276async fn search_similar_for_items(
277    valid: &[(usize, Vec<f32>)],
278    embedding_store: &EmbeddingStore,
279    search_limit: usize,
280) -> Vec<Result<Vec<crate::ScoredVectorPoint>, MemoryError>> {
281    use futures::future;
282
283    future::join_all(valid.iter().map(|(_, vec)| {
284        embedding_store.search_collection(
285            ENTITY_COLLECTION,
286            vec,
287            search_limit,
288            None::<VectorFilter>,
289        )
290    }))
291    .await
292}
293
294/// Phase 4: insert similarity edges, deduplicating pairs seen from both A→B and B→A.
295///
296/// Without deduplication, both directions would call `insert_edge` for the same normalised
297/// pair and both return `Ok`, inflating `edges_created` by the number of bidirectional hits.
298async fn insert_similarity_edges(
299    work_items: &[EntityWorkItem],
300    valid: &[(usize, Vec<f32>)],
301    search_results: &[Result<Vec<crate::ScoredVectorPoint>, MemoryError>],
302    cfg: &NoteLinkingConfig,
303    store: &crate::graph::GraphStore,
304    stats: &mut LinkingStats,
305) {
306    let mut seen_pairs = std::collections::HashSet::new();
307
308    for ((work_idx, _), search_result) in valid.iter().zip(search_results.iter()) {
309        let w = &work_items[*work_idx];
310
311        let results = match search_result {
312            Ok(r) => r,
313            Err(e) => {
314                tracing::debug!(
315                    "note_linking: search failed for entity {:?}: {e:#}",
316                    w.canonical_name
317                );
318                continue;
319            }
320        };
321
322        stats.entities_processed += 1;
323
324        let self_point_id = w.self_point_id.as_deref();
325        let candidates = results
326            .iter()
327            .filter(|p| Some(p.id.as_str()) != self_point_id && p.score >= cfg.similarity_threshold)
328            .take(cfg.top_k);
329
330        for point in candidates {
331            let Some(target_id) = point
332                .payload
333                .get("entity_id")
334                .and_then(serde_json::Value::as_i64)
335            else {
336                tracing::debug!(
337                    "note_linking: missing entity_id in payload for point {}",
338                    point.id
339                );
340                continue;
341            };
342
343            if target_id == w.entity_id {
344                continue; // secondary self-guard when qdrant_point_id is null
345            }
346
347            // Normalise direction: always store source_id < target_id.
348            let (src, tgt) = if w.entity_id < target_id {
349                (w.entity_id, target_id)
350            } else {
351                (target_id, w.entity_id)
352            };
353
354            if !seen_pairs.insert((src, tgt)) {
355                continue;
356            }
357
358            let fact = format!("Semantically similar entities (score: {:.3})", point.score);
359
360            match store
361                .insert_edge(src, tgt, "similar_to", &fact, point.score, None)
362                .await
363            {
364                Ok(_) => stats.edges_created += 1,
365                Err(e) => {
366                    tracing::debug!("note_linking: insert_edge failed: {e:#}");
367                }
368            }
369        }
370    }
371}
372
373/// Extract entities and edges from `content` and persist them to the graph store.
374///
375/// This function runs inside a spawned task — it receives owned data only.
376///
377/// The optional `embedding_store` enables entity embedding storage in Qdrant, which is
378/// required for A-MEM note linking to find semantically similar entities across sessions.
379///
380/// # Errors
381///
382/// Returns an error if the database query fails or LLM extraction fails.
383#[cfg_attr(
384    feature = "profiling",
385    tracing::instrument(name = "memory.graph_extract", skip_all, fields(entities = tracing::field::Empty, edges = tracing::field::Empty))
386)]
387pub async fn extract_and_store(
388    content: String,
389    context_messages: Vec<String>,
390    provider: AnyProvider,
391    pool: DbPool,
392    config: GraphExtractionConfig,
393    post_extract_validator: PostExtractValidator,
394    embedding_store: Option<Arc<EmbeddingStore>>,
395) -> Result<ExtractionResult, MemoryError> {
396    use crate::graph::{EntityResolver, GraphExtractor, GraphStore};
397
398    let extractor = GraphExtractor::new(
399        provider.clone(),
400        config.max_entities,
401        config.max_edges,
402        config.llm_timeout_secs,
403    );
404    let ctx_refs: Vec<&str> = context_messages.iter().map(String::as_str).collect();
405
406    let store =
407        GraphStore::new(pool).with_benna_rates(config.benna_fast_rate, config.benna_slow_rate);
408
409    bump_extraction_count(store.pool()).await?;
410
411    let Some(result) = extractor.extract(&content, &ctx_refs).await? else {
412        return Ok(ExtractionResult::default());
413    };
414
415    // Post-extraction validation callback. zeph-memory does not know the callback is a
416    // security validator — it is a generic predicate opaque to this crate (design decision D1).
417    if let Some(ref validator) = post_extract_validator
418        && let Err(reason) = validator(&result)
419    {
420        tracing::warn!(
421            reason,
422            "graph extraction validation failed, skipping upsert"
423        );
424        return Ok(ExtractionResult::default());
425    }
426
427    let resolver = if let Some(ref emb) = embedding_store {
428        EntityResolver::new(&store)
429            .with_embedding_store(emb)
430            .with_provider(&provider)
431            .with_embed_timeout(config.embed_timeout_secs)
432    } else {
433        EntityResolver::new(&store).with_embed_timeout(config.embed_timeout_secs)
434    };
435
436    let (entity_name_to_id, entities_upserted) = upsert_entities(&resolver, &result.entities).await;
437    let edges_inserted = insert_edges(&resolver, &result.edges, &entity_name_to_id, &config).await;
438
439    #[cfg(any(feature = "sqlite", feature = "postgres"))]
440    store.checkpoint_wal().await?;
441
442    let new_entity_ids: Vec<i64> = entity_name_to_id.into_values().collect();
443
444    link_episode(&store, &config, &new_entity_ids).await;
445
446    #[cfg(feature = "profiling")]
447    {
448        let span = tracing::Span::current();
449        span.record("entities", entities_upserted);
450        span.record("edges", edges_inserted);
451    }
452
453    Ok(ExtractionResult {
454        stats: ExtractionStats {
455            entities_upserted,
456            edges_inserted,
457        },
458        entity_ids: new_entity_ids,
459    })
460}
461
462/// Increment the extraction counter in `graph_metadata`.
463async fn bump_extraction_count(pool: &DbPool) -> Result<(), MemoryError> {
464    zeph_db::query(sql!(
465        "INSERT INTO graph_metadata (key, value) VALUES ('extraction_count', '0')
466         ON CONFLICT(key) DO NOTHING"
467    ))
468    .execute(pool)
469    .await?;
470    zeph_db::query(sql!(
471        "UPDATE graph_metadata
472         SET value = CAST(CAST(value AS INTEGER) + 1 AS TEXT)
473         WHERE key = 'extraction_count'"
474    ))
475    .execute(pool)
476    .await?;
477    Ok(())
478}
479
480/// Upsert all extracted entities and return the name-to-id map and upsert count.
481async fn upsert_entities(
482    resolver: &crate::graph::EntityResolver<'_>,
483    entities: &[crate::graph::extractor::ExtractedEntity],
484) -> (std::collections::HashMap<String, i64>, usize) {
485    let mut entity_name_to_id: std::collections::HashMap<String, i64> =
486        std::collections::HashMap::new();
487    let mut entities_upserted = 0usize;
488
489    for entity in entities {
490        match resolver
491            .resolve(&entity.name, &entity.entity_type, entity.summary.as_deref())
492            .await
493        {
494            Ok((id, _outcome)) => {
495                entity_name_to_id.insert(entity.name.clone(), id);
496                entities_upserted += 1;
497            }
498            Err(e) => {
499                tracing::debug!("graph: skipping entity {:?}: {e:#}", entity.name);
500            }
501        }
502    }
503
504    (entity_name_to_id, entities_upserted)
505}
506
507/// Returns `true` when `relation` is a generic, low-information connector.
508///
509/// Used by the `MemORAI` write-gate to avoid storing vacuous edges (#3709).
510fn is_low_signal_relation(relation: &str) -> bool {
511    const LOW_SIGNAL: &[&str] = &[
512        "related_to",
513        "related to",
514        "is related to",
515        "associated_with",
516        "associated with",
517        "has",
518        "have",
519        "is",
520        "are",
521        "mentions",
522        "mentioned",
523        "involves",
524        "involved",
525    ];
526    LOW_SIGNAL.iter().any(|&s| relation.eq_ignore_ascii_case(s))
527}
528
529/// Insert extracted edges that have both endpoints in `name_to_id`.
530///
531/// Returns the number of edges actually inserted.
532#[allow(clippy::too_many_lines)]
533async fn insert_edges(
534    resolver: &crate::graph::EntityResolver<'_>,
535    edges: &[crate::graph::extractor::ExtractedEdge],
536    name_to_id: &std::collections::HashMap<String, i64>,
537    config: &GraphExtractionConfig,
538) -> usize {
539    let mut edges_inserted = 0usize;
540    for edge in edges {
541        // MemORAI write-gate: drop low-signal edges below the relevance threshold (#3709).
542        if let Some(min_rel) = config.write_gate_min_relevance {
543            let conf = edge.confidence.unwrap_or(1.0);
544            if conf < min_rel && is_low_signal_relation(&edge.relation) {
545                tracing::debug!(
546                    relation = %edge.relation,
547                    confidence = conf,
548                    threshold = min_rel,
549                    "write-gate: skipping low-signal edge"
550                );
551                continue;
552            }
553        }
554        let (Some(&src_id), Some(&tgt_id)) =
555            (name_to_id.get(&edge.source), name_to_id.get(&edge.target))
556        else {
557            tracing::debug!(
558                "graph: skipping edge {:?}->{:?}: entity not resolved",
559                edge.source,
560                edge.target
561            );
562            continue;
563        };
564        if src_id == tgt_id {
565            tracing::debug!(
566                "graph: skipping self-loop edge {:?}->{:?} (entity_id={src_id})",
567                edge.source,
568                edge.target
569            );
570            continue;
571        }
572        // Parse LLM-provided edge_type; default to Semantic on any parse failure so
573        // edges are never dropped due to classification errors.
574        let edge_type = edge
575            .edge_type
576            .parse::<crate::graph::EdgeType>()
577            .unwrap_or_else(|_| {
578                tracing::warn!(
579                    raw_type = %edge.edge_type,
580                    "graph: unknown edge_type from LLM, defaulting to semantic"
581                );
582                crate::graph::EdgeType::Semantic
583            });
584        if config.apex_mem_enabled {
585            // APEX-MEM: append-only write path with supersession chains.
586            let relation_trimmed = edge.relation.trim();
587            let relation_display_clean = strip_control_chars(relation_trimmed);
588            let relation_display =
589                truncate_to_bytes_ref(&relation_display_clean, MAX_RELATION_BYTES).to_owned();
590            let canonical_clean = strip_control_chars(&relation_trimmed.to_lowercase());
591            let canonical_relation =
592                truncate_to_bytes_ref(&canonical_clean, MAX_RELATION_BYTES).to_owned();
593            let fact_clean = strip_control_chars(edge.fact.trim());
594            let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
595            match resolver
596                .graph_store()
597                .insert_or_supersede_with_turn_index_and_metrics(
598                    src_id,
599                    tgt_id,
600                    &relation_display,
601                    &canonical_relation,
602                    &normalized_fact,
603                    edge.confidence.unwrap_or(DEFAULT_EDGE_CONFIDENCE),
604                    None,
605                    edge_type,
606                    true,
607                    None,
608                    config.turn_index,
609                )
610                .await
611            {
612                Ok(_) => edges_inserted += 1,
613                Err(e) => {
614                    tracing::debug!("graph: skipping edge (apex): {e:#}");
615                }
616            }
617        } else {
618            let belief_cfg =
619                config
620                    .belief_revision_enabled
621                    .then_some(crate::graph::BeliefRevisionConfig {
622                        similarity_threshold: config.belief_revision_similarity_threshold,
623                    });
624            match resolver
625                .resolve_edge_typed(
626                    src_id,
627                    tgt_id,
628                    &edge.relation,
629                    &edge.fact,
630                    edge.confidence.unwrap_or(DEFAULT_EDGE_CONFIDENCE),
631                    None,
632                    edge_type,
633                    belief_cfg.as_ref(),
634                )
635                .await
636            {
637                Ok(Some(_)) => edges_inserted += 1,
638                Ok(None) => {} // deduplicated
639                Err(e) => {
640                    tracing::debug!("graph: skipping edge: {e:#}");
641                }
642            }
643        }
644    }
645    edges_inserted
646}
647
648/// Link extracted entities to their GAAMA episode when a conversation ID is configured.
649async fn link_episode(
650    store: &crate::graph::GraphStore,
651    config: &GraphExtractionConfig,
652    entity_ids: &[i64],
653) {
654    let Some(conv_id) = config.conversation_id else {
655        return;
656    };
657    match store.ensure_episode(conv_id).await {
658        Ok(episode_id) => {
659            for &entity_id in entity_ids {
660                if let Err(e) = store.link_entity_to_episode(episode_id, entity_id).await {
661                    tracing::debug!("episode linking skipped for entity {entity_id}: {e:#}");
662                }
663            }
664        }
665        Err(e) => {
666            tracing::warn!("failed to ensure episode for conversation {conv_id}: {e:#}");
667        }
668    }
669}
670
671impl SemanticMemory {
672    /// Spawn background graph extraction for a message. Fire-and-forget — never blocks.
673    ///
674    /// Extraction runs in a separate tokio task with a timeout. Any error or timeout is
675    /// logged and the task exits silently; the agent response is never blocked.
676    ///
677    /// The optional `post_extract_validator` is called after extraction, before upsert.
678    /// It is a generic predicate opaque to zeph-memory (design decision D1).
679    ///
680    /// When `config.note_linking.enabled` is `true` and an embedding store is available,
681    /// `link_memory_notes` runs after successful extraction inside the same task, bounded
682    /// by `config.note_linking.timeout_secs`.
683    ///
684    /// # Panics
685    ///
686    /// Panics if the internal `graph_cancel` mutex is poisoned (another thread panicked
687    /// while holding the lock).
688    pub fn spawn_graph_extraction(
689        &self,
690        content: String,
691        context_messages: Vec<String>,
692        config: GraphExtractionConfig,
693        post_extract_validator: PostExtractValidator,
694        provider_override: Option<AnyProvider>,
695        cancel: CancellationToken,
696    ) -> tokio::task::JoinHandle<()> {
697        let using_override = provider_override.is_some();
698        let provider = provider_override.unwrap_or_else(|| self.provider.clone());
699        if using_override {
700            tracing::debug!(
701                extract_provider = provider.name(),
702                "graph extraction using override provider (quality_gate bypassed)"
703            );
704        }
705        *self
706            .graph_cancel
707            .lock()
708            .expect("graph_cancel mutex poisoned") = Some(cancel.clone());
709
710        let ctx = GraphExtractionTaskCtx {
711            pool: self.sqlite.pool().clone(),
712            provider,
713            failure_counter: self.community_detection_failures.clone(),
714            extraction_count: self.graph_extraction_count.clone(),
715            extraction_failures: self.graph_extraction_failures.clone(),
716            embedding_store: self.qdrant.clone(),
717            cancel,
718        };
719
720        tokio::spawn(run_graph_extraction_task(
721            content,
722            context_messages,
723            config,
724            post_extract_validator,
725            ctx,
726        ))
727    }
728
729    /// Signal cooperative cancellation to the current background graph-extraction task.
730    ///
731    /// Fires the [`CancellationToken`] stored by the most recent [`spawn_graph_extraction`]
732    /// call. The task checks the token at community-refresh boundaries, so it exits cleanly
733    /// rather than being hard-aborted. This should be called before the supervisor calls
734    /// `abort()` on the underlying `JoinHandle` to give the task a chance to flush state.
735    ///
736    /// No-op if no extraction has been spawned or the previous token has already fired.
737    ///
738    /// # Panics
739    ///
740    /// Panics if the internal `graph_cancel` mutex is poisoned (another thread panicked
741    /// while holding the lock).
742    ///
743    /// [`spawn_graph_extraction`]: SemanticMemory::spawn_graph_extraction
744    pub fn cancel_graph_extraction(&self) {
745        if let Some(token) = self
746            .graph_cancel
747            .lock()
748            .expect("graph_cancel mutex poisoned")
749            .as_ref()
750        {
751            token.cancel();
752        }
753    }
754}
755
756/// Owned context bundled for the spawned extraction task.
757///
758/// Bundles the Arcs that must be cloned before entering `tokio::spawn`.
759struct GraphExtractionTaskCtx {
760    pool: DbPool,
761    provider: AnyProvider,
762    failure_counter: Arc<std::sync::atomic::AtomicU64>,
763    extraction_count: Arc<std::sync::atomic::AtomicU64>,
764    extraction_failures: Arc<std::sync::atomic::AtomicU64>,
765    embedding_store: Option<Arc<EmbeddingStore>>,
766    /// Cancellation signal propagated into background sub-tasks (community refresh).
767    cancel: CancellationToken,
768}
769
770/// Body of the spawned graph-extraction task.
771async fn run_graph_extraction_task(
772    content: String,
773    context_messages: Vec<String>,
774    config: GraphExtractionConfig,
775    post_extract_validator: PostExtractValidator,
776    ctx: GraphExtractionTaskCtx,
777) {
778    let timeout_dur = std::time::Duration::from_secs(config.extraction_timeout_secs);
779    let extraction_result = tokio::time::timeout(
780        timeout_dur,
781        extract_and_store(
782            content,
783            context_messages,
784            ctx.provider.clone(),
785            ctx.pool.clone(),
786            config.clone(),
787            post_extract_validator,
788            ctx.embedding_store.clone(),
789        ),
790    )
791    .await;
792
793    let (extraction_ok, new_entity_ids) = match extraction_result {
794        Ok(Ok(result)) => {
795            tracing::debug!(
796                entities = result.stats.entities_upserted,
797                edges = result.stats.edges_inserted,
798                "graph extraction completed"
799            );
800            ctx.extraction_count.fetch_add(1, Ordering::Relaxed);
801            (true, result.entity_ids)
802        }
803        Ok(Err(e)) => {
804            tracing::warn!("graph extraction failed: {e:#}");
805            ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
806            (false, vec![])
807        }
808        Err(_elapsed) => {
809            tracing::warn!("graph extraction timed out");
810            ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
811            (false, vec![])
812        }
813    };
814
815    run_note_linking(
816        extraction_ok,
817        &new_entity_ids,
818        ctx.pool.clone(),
819        ctx.embedding_store,
820        ctx.provider.clone(),
821        &config,
822    )
823    .await;
824
825    maybe_refresh_communities(
826        extraction_ok,
827        ctx.pool,
828        ctx.provider,
829        ctx.failure_counter,
830        &config,
831        ctx.cancel,
832    )
833    .await;
834}
835
836/// Run A-MEM note linking after successful extraction when enabled.
837async fn run_note_linking(
838    extraction_ok: bool,
839    new_entity_ids: &[i64],
840    pool: DbPool,
841    embedding_store: Option<Arc<EmbeddingStore>>,
842    provider: AnyProvider,
843    config: &GraphExtractionConfig,
844) {
845    if !extraction_ok || !config.note_linking.enabled || new_entity_ids.is_empty() {
846        return;
847    }
848    let Some(store) = embedding_store else {
849        return;
850    };
851    let linking_timeout = std::time::Duration::from_secs(config.note_linking.timeout_secs);
852    match tokio::time::timeout(
853        linking_timeout,
854        link_memory_notes(new_entity_ids, pool, store, provider, &config.note_linking),
855    )
856    .await
857    {
858        Ok(stats) => {
859            tracing::debug!(
860                entities_processed = stats.entities_processed,
861                edges_created = stats.edges_created,
862                "note linking completed"
863            );
864        }
865        Err(_elapsed) => {
866            tracing::debug!("note linking timed out (partial edges may exist)");
867        }
868    }
869}
870
871/// Trigger community detection, graph eviction, and link-weight decay when the extraction
872/// count hits the configured refresh interval.
873///
874/// Runs inline within the caller's task (no nested `tokio::spawn`). Each long-running step
875/// is guarded by `tokio::select!` on `cancel` so shutdown aborts immediately at the next
876/// yield point without leaving orphaned tasks.
877async fn maybe_refresh_communities(
878    extraction_ok: bool,
879    pool: DbPool,
880    provider: AnyProvider,
881    failure_counter: Arc<std::sync::atomic::AtomicU64>,
882    config: &GraphExtractionConfig,
883    cancel: CancellationToken,
884) {
885    use crate::graph::GraphStore;
886
887    if !extraction_ok || config.community_refresh_interval == 0 {
888        return;
889    }
890
891    let store = GraphStore::new(pool.clone());
892    let extraction_count = store.extraction_count().await.unwrap_or(0);
893    if extraction_count == 0
894        || !i64::try_from(config.community_refresh_interval)
895            .is_ok_and(|interval| extraction_count % interval == 0)
896    {
897        return;
898    }
899
900    tracing::info!(extraction_count, "triggering community detection refresh");
901    let store2 = GraphStore::new(pool);
902    let retention_days = config.expired_edge_retention_days;
903    let max_cap = config.max_entities_cap;
904    let max_prompt_bytes = config.community_summary_max_prompt_bytes;
905    let concurrency = config.community_summary_concurrency;
906    let edge_chunk_size = config.lpa_edge_chunk_size;
907    let decay_lambda = config.link_weight_decay_lambda;
908    let decay_interval_secs = config.link_weight_decay_interval_secs;
909
910    tokio::select! {
911        () = cancel.cancelled() => {
912            tracing::debug!("community refresh cancelled before community detection");
913            return;
914        }
915        result = crate::graph::community::detect_communities(
916            &store2,
917            &provider,
918            max_prompt_bytes,
919            concurrency,
920            edge_chunk_size,
921        ) => {
922            match result {
923                Ok(count) => {
924                    tracing::info!(communities = count, "community detection complete");
925                }
926                Err(e) => {
927                    tracing::warn!("community detection failed: {e:#}");
928                    failure_counter.fetch_add(1, Ordering::Relaxed);
929                }
930            }
931        }
932    }
933
934    tokio::select! {
935        () = cancel.cancelled() => {
936            tracing::debug!("community refresh cancelled before graph eviction");
937            return;
938        }
939        result = crate::graph::community::run_graph_eviction(&store2, retention_days, max_cap) => {
940            match result {
941                Ok(stats) => {
942                    tracing::info!(
943                        expired_edges = stats.expired_edges_deleted,
944                        orphan_entities = stats.orphan_entities_deleted,
945                        capped_entities = stats.capped_entities_deleted,
946                        "graph eviction complete"
947                    );
948                }
949                Err(e) => {
950                    tracing::warn!("graph eviction failed: {e:#}");
951                }
952            }
953        }
954    }
955
956    // Time-based link weight decay — independent of eviction cycle.
957    if decay_lambda > 0.0 && decay_interval_secs > 0 {
958        let now_secs = std::time::SystemTime::now()
959            .duration_since(std::time::UNIX_EPOCH)
960            .map_or(0, |d| d.as_secs());
961        let last_decay = store2
962            .get_metadata("last_link_weight_decay_at")
963            .await
964            .ok()
965            .flatten()
966            .and_then(|s| s.parse::<u64>().ok())
967            .unwrap_or(0);
968        if now_secs.saturating_sub(last_decay) >= decay_interval_secs {
969            tokio::select! {
970                () = cancel.cancelled() => {
971                    tracing::debug!("community refresh cancelled before link weight decay");
972                }
973                result = store2.decay_edge_retrieval_counts(decay_lambda, decay_interval_secs) => {
974                    match result {
975                        Ok(affected) => {
976                            tracing::info!(affected, "link weight decay applied");
977                            let _ = store2
978                                .set_metadata("last_link_weight_decay_at", &now_secs.to_string())
979                                .await;
980                        }
981                        Err(e) => {
982                            tracing::warn!("link weight decay failed: {e:#}");
983                        }
984                    }
985                }
986            }
987        }
988    }
989}
990
991#[cfg(test)]
992mod tests {
993    use std::sync::Arc;
994
995    use zeph_llm::any::AnyProvider;
996
997    use super::{NoteLinkingConfig, extract_and_store};
998    use crate::embedding_store::EmbeddingStore;
999    use crate::graph::GraphStore;
1000    use crate::in_memory_store::InMemoryVectorStore;
1001    use crate::store::SqliteStore;
1002
1003    use super::GraphExtractionConfig;
1004
1005    async fn setup() -> (GraphStore, Arc<EmbeddingStore>) {
1006        let sqlite = SqliteStore::new(":memory:").await.unwrap();
1007        let pool = sqlite.pool().clone();
1008        let mem_store = Box::new(InMemoryVectorStore::new());
1009        let emb = Arc::new(EmbeddingStore::with_store(mem_store, pool.clone()));
1010        let gs = GraphStore::new(pool);
1011        (gs, emb)
1012    }
1013
1014    /// Regression test for #1829: `extract_and_store()` must pass the provider to `EntityResolver`
1015    /// so that `store_entity_embedding()` is called and `qdrant_point_id` is set in `SQLite`.
1016    #[tokio::test]
1017    async fn extract_and_store_sets_qdrant_point_id_when_embedding_store_provided() {
1018        let (gs, emb) = setup().await;
1019
1020        // MockProvider: supports embeddings, returns a valid extraction JSON for chat
1021        let extraction_json = r#"{"entities":[{"name":"Rust","type":"language","summary":"systems language"}],"edges":[]}"#;
1022        let mut mock =
1023            zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1024        mock.supports_embeddings = true;
1025        mock.embedding = vec![1.0_f32, 0.0, 0.0, 0.0];
1026        let provider = AnyProvider::Mock(mock);
1027
1028        let config = GraphExtractionConfig {
1029            max_entities: 10,
1030            max_edges: 10,
1031            extraction_timeout_secs: 10,
1032            ..Default::default()
1033        };
1034
1035        let result = extract_and_store(
1036            "Rust is a systems programming language.".to_owned(),
1037            vec![],
1038            provider,
1039            gs.pool().clone(),
1040            config,
1041            None,
1042            Some(emb.clone()),
1043        )
1044        .await
1045        .unwrap();
1046
1047        assert_eq!(
1048            result.stats.entities_upserted, 1,
1049            "one entity should be upserted"
1050        );
1051
1052        // The entity must have a qdrant_point_id — this proves store_entity_embedding() was called.
1053        // Before the fix, EntityResolver was built without a provider, so embed() was never called
1054        // and qdrant_point_id remained NULL.
1055        let entity = gs
1056            .find_entity("rust", crate::graph::EntityType::Language)
1057            .await
1058            .unwrap()
1059            .expect("entity 'rust' must exist in SQLite");
1060
1061        assert!(
1062            entity.qdrant_point_id.is_some(),
1063            "qdrant_point_id must be set when embedding_store + provider are both provided (regression for #1829)"
1064        );
1065    }
1066
1067    /// When no `embedding_store` is provided, `extract_and_store()` must still work correctly
1068    /// (no embeddings stored, but entities are still upserted).
1069    #[tokio::test]
1070    async fn extract_and_store_without_embedding_store_still_upserts_entities() {
1071        let (gs, _emb) = setup().await;
1072
1073        let extraction_json = r#"{"entities":[{"name":"Python","type":"language","summary":"scripting"}],"edges":[]}"#;
1074        let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1075        let provider = AnyProvider::Mock(mock);
1076
1077        let config = GraphExtractionConfig {
1078            max_entities: 10,
1079            max_edges: 10,
1080            extraction_timeout_secs: 10,
1081            ..Default::default()
1082        };
1083
1084        let result = extract_and_store(
1085            "Python is a scripting language.".to_owned(),
1086            vec![],
1087            provider,
1088            gs.pool().clone(),
1089            config,
1090            None,
1091            None, // no embedding_store
1092        )
1093        .await
1094        .unwrap();
1095
1096        assert_eq!(result.stats.entities_upserted, 1);
1097
1098        let entity = gs
1099            .find_entity("python", crate::graph::EntityType::Language)
1100            .await
1101            .unwrap()
1102            .expect("entity 'python' must exist");
1103
1104        assert!(
1105            entity.qdrant_point_id.is_none(),
1106            "qdrant_point_id must remain None when no embedding_store is provided"
1107        );
1108    }
1109
1110    /// Regression test for #2166: FTS5 entity writes must be visible to a new connection pool
1111    /// opened after extraction completes. Without `checkpoint_wal()` in `extract_and_store`,
1112    /// a fresh pool sees stale FTS5 shadow tables and `find_entities_fuzzy` returns empty.
1113    #[tokio::test]
1114    async fn extract_and_store_fts5_cross_session_visibility() {
1115        let file = tempfile::NamedTempFile::new().expect("tempfile");
1116        let path = file.path().to_str().expect("valid path").to_string();
1117
1118        // Session A: run extract_and_store on a file DB (not :memory:) so WAL is used.
1119        {
1120            let sqlite = crate::store::SqliteStore::new(&path).await.unwrap();
1121            let extraction_json = r#"{"entities":[{"name":"Ferris","type":"concept","summary":"Rust mascot"}],"edges":[]}"#;
1122            let mock =
1123                zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1124            let provider = AnyProvider::Mock(mock);
1125            let config = GraphExtractionConfig {
1126                max_entities: 10,
1127                max_edges: 10,
1128                extraction_timeout_secs: 10,
1129                ..Default::default()
1130            };
1131            extract_and_store(
1132                "Ferris is the Rust mascot.".to_owned(),
1133                vec![],
1134                provider,
1135                sqlite.pool().clone(),
1136                config,
1137                None,
1138                None,
1139            )
1140            .await
1141            .unwrap();
1142        }
1143
1144        // Session B: new pool — FTS5 must see the entity extracted in session A.
1145        let sqlite_b = crate::store::SqliteStore::new(&path).await.unwrap();
1146        let gs_b = crate::graph::GraphStore::new(sqlite_b.pool().clone());
1147        let results = gs_b.find_entities_fuzzy("Ferris", 10).await.unwrap();
1148        assert!(
1149            !results.is_empty(),
1150            "FTS5 cross-session (#2166): entity extracted in session A must be visible in session B"
1151        );
1152    }
1153
1154    /// Regression test for #2215: self-loop edges (source == target entity) must be silently
1155    /// skipped; no edge row should be inserted.
1156    #[tokio::test]
1157    async fn extract_and_store_skips_self_loop_edges() {
1158        let (gs, _emb) = setup().await;
1159
1160        // LLM returns one entity and one self-loop edge (source == target).
1161        let extraction_json = r#"{
1162            "entities":[{"name":"Rust","type":"language","summary":"systems language"}],
1163            "edges":[{"source":"Rust","target":"Rust","relation":"is","fact":"Rust is Rust","edge_type":"semantic"}]
1164        }"#;
1165        let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1166        let provider = AnyProvider::Mock(mock);
1167
1168        let config = GraphExtractionConfig {
1169            max_entities: 10,
1170            max_edges: 10,
1171            extraction_timeout_secs: 10,
1172            ..Default::default()
1173        };
1174
1175        let result = extract_and_store(
1176            "Rust is a language.".to_owned(),
1177            vec![],
1178            provider,
1179            gs.pool().clone(),
1180            config,
1181            None,
1182            None,
1183        )
1184        .await
1185        .unwrap();
1186
1187        assert_eq!(result.stats.entities_upserted, 1);
1188        assert_eq!(
1189            result.stats.edges_inserted, 0,
1190            "self-loop edge must be rejected (#2215)"
1191        );
1192    }
1193
1194    /// When `apex_mem_enabled = true`, edges must be inserted via `insert_or_supersede`
1195    /// (the APEX-MEM append-only path) instead of the legacy `resolve_edge_typed` path.
1196    /// Verifies that edges are still counted as inserted and that the supersession row
1197    /// is created in the database.
1198    #[tokio::test]
1199    async fn apex_mem_path_inserts_edge_via_insert_or_supersede() {
1200        let (gs, _emb) = setup().await;
1201
1202        let extraction_json = r#"{
1203            "entities":[
1204                {"name":"Alice","type":"person","summary":"a person"},
1205                {"name":"Bob","type":"person","summary":"another person"}
1206            ],
1207            "edges":[
1208                {"source":"Alice","target":"Bob","relation":"KNOWS","fact":"Alice knows Bob","edge_type":"semantic"}
1209            ]
1210        }"#;
1211        let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1212        let provider = AnyProvider::Mock(mock);
1213
1214        let config = GraphExtractionConfig {
1215            max_entities: 10,
1216            max_edges: 10,
1217            extraction_timeout_secs: 10,
1218            apex_mem_enabled: true,
1219            ..Default::default()
1220        };
1221
1222        let result = extract_and_store(
1223            "Alice knows Bob.".to_owned(),
1224            vec![],
1225            provider,
1226            gs.pool().clone(),
1227            config,
1228            None,
1229            None,
1230        )
1231        .await
1232        .unwrap();
1233
1234        assert_eq!(result.stats.entities_upserted, 2, "two entities expected");
1235        assert_eq!(
1236            result.stats.edges_inserted, 1,
1237            "APEX-MEM path must insert the edge and count it (#3631)"
1238        );
1239
1240        // Verify the edge row exists and its relation preserves display casing.
1241        let alice_id = gs
1242            .find_entity("alice", crate::graph::EntityType::Person)
1243            .await
1244            .unwrap()
1245            .expect("entity 'alice' must exist")
1246            .id
1247            .0;
1248        let bob_id = gs
1249            .find_entity("bob", crate::graph::EntityType::Person)
1250            .await
1251            .unwrap()
1252            .expect("entity 'bob' must exist")
1253            .id
1254            .0;
1255        let edges = gs.edges_exact(alice_id, bob_id).await.unwrap();
1256        assert_eq!(edges.len(), 1, "exactly one edge expected");
1257        // canonical_relation is lowercased; relation field preserves original casing post-strip
1258        assert_eq!(
1259            edges[0].relation, "KNOWS",
1260            "display relation must preserve original casing"
1261        );
1262    }
1263
1264    /// Regression for #4297: `embed_work_items` must return an empty Vec (fail-open) when the
1265    /// batch `join_all` embed call exceeds the 30 s global timeout.
1266    #[tokio::test]
1267    async fn embed_work_items_timeout_returns_empty() {
1268        use zeph_llm::mock::MockProvider;
1269
1270        // embed_delay_ms > 30_000 ms would make the test too slow; we rely on tokio::time::pause
1271        // to advance virtual time instantly, so the timeout fires without real delay.
1272        tokio::time::pause();
1273
1274        // Delay longer than the 30 s timeout (in virtual time).
1275        let mut mock = MockProvider::default();
1276        mock.supports_embeddings = true;
1277        mock.embed_delay_ms = 31_000;
1278        let provider = AnyProvider::Mock(mock);
1279
1280        let work_items = vec![super::EntityWorkItem {
1281            entity_id: 1,
1282            canonical_name: "Alice".to_owned(),
1283            embed_text: "Alice".to_owned(),
1284            self_point_id: None,
1285        }];
1286
1287        let cfg = NoteLinkingConfig {
1288            timeout_secs: 30,
1289            ..NoteLinkingConfig::default()
1290        };
1291        let result = super::embed_work_items(&work_items, &provider, &cfg).await;
1292        assert!(
1293            result.is_empty(),
1294            "embed_work_items must return empty Vec on 30 s timeout (fail-open)"
1295        );
1296    }
1297
1298    /// Regression for #4622: `maybe_refresh_communities` must return immediately when the
1299    /// `CancellationToken` is already cancelled, without hanging or panicking.
1300    ///
1301    /// Before the fix a nested `tokio::spawn` was used with no `CancellationToken`, so shutdown
1302    /// could not interrupt community detection.  The inline `tokio::select!` path now exits at
1303    /// the first select arm when the token is pre-cancelled.
1304    #[tokio::test]
1305    async fn maybe_refresh_communities_respects_cancelled_token() {
1306        use tokio_util::sync::CancellationToken;
1307
1308        use crate::graph::GraphStore;
1309        use crate::store::SqliteStore;
1310
1311        let sqlite = SqliteStore::new(":memory:").await.unwrap();
1312        let pool = sqlite.pool().clone();
1313        let gs = GraphStore::new(pool.clone());
1314
1315        // Seed extraction_count=1 so the interval check passes (1 % 1 == 0).
1316        gs.set_metadata("extraction_count", "1").await.unwrap();
1317
1318        let config = GraphExtractionConfig {
1319            community_refresh_interval: 1,
1320            ..Default::default()
1321        };
1322
1323        let cancel = CancellationToken::new();
1324        cancel.cancel(); // pre-cancelled — all select! arms must short-circuit immediately
1325
1326        let extraction_json = r#"{"entities":[],"edges":[]}"#;
1327        let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1328        let provider = AnyProvider::Mock(mock);
1329
1330        let failure_counter = Arc::new(std::sync::atomic::AtomicU64::new(0));
1331
1332        // Must complete promptly — if the fix regresses and a blocking call is made this will
1333        // hang forever (caught by tokio::time::timeout in CI or a test runtime timeout).
1334        super::maybe_refresh_communities(
1335            true,
1336            pool,
1337            provider,
1338            failure_counter.clone(),
1339            &config,
1340            cancel,
1341        )
1342        .await;
1343
1344        assert_eq!(
1345            failure_counter.load(std::sync::atomic::Ordering::Relaxed),
1346            0,
1347            "no failures should be recorded when cancelled before any detection step"
1348        );
1349    }
1350
1351    #[test]
1352    fn is_low_signal_known_values() {
1353        assert!(
1354            super::is_low_signal_relation("related_to"),
1355            "related_to must be low-signal"
1356        );
1357        assert!(
1358            super::is_low_signal_relation("related to"),
1359            "related to (space) must be low-signal"
1360        );
1361        assert!(
1362            super::is_low_signal_relation("IS"),
1363            "IS (uppercase) must be low-signal (case-insensitive)"
1364        );
1365        assert!(
1366            super::is_low_signal_relation("mentions"),
1367            "mentions must be low-signal"
1368        );
1369    }
1370
1371    #[test]
1372    fn is_low_signal_specific_relations_pass() {
1373        assert!(
1374            !super::is_low_signal_relation("causes"),
1375            "causes must NOT be low-signal"
1376        );
1377        assert!(
1378            !super::is_low_signal_relation("works_at"),
1379            "works_at must NOT be low-signal"
1380        );
1381        assert!(
1382            !super::is_low_signal_relation("born_in"),
1383            "born_in must NOT be low-signal"
1384        );
1385    }
1386
1387    /// Regression test for #4711: configured Benna-Fusi rates must produce different
1388    /// `confidence_fast`/`confidence_slow` values than the hardcoded defaults.
1389    ///
1390    /// `extract_and_store` builds `GraphStore::new(pool).with_benna_rates(fast, slow)` using
1391    /// `config.benna_fast_rate` / `config.benna_slow_rate`.  Before the fix it called
1392    /// `GraphStore::new(pool)` only, so the configured rates were silently ignored.
1393    ///
1394    /// This test calls `GraphStore::insert_edge_typed` directly (bypassing the resolver dedup
1395    /// layer) to exercise the Benna-Fusi UPDATE path with two confidence levels (0.6 → 0.8).
1396    ///
1397    /// Math:
1398    ///   default (0.5/0.05):  fast = 0.6 + 0.5*(0.8-0.6) = 0.7;  slow ≈ 0.605
1399    ///   custom  (0.1/0.02):  fast = 0.6 + 0.1*(0.8-0.6) = 0.62; slow ≈ 0.6004
1400    #[tokio::test]
1401    async fn extract_and_store_respects_configured_benna_rates() {
1402        use crate::graph::EdgeType;
1403
1404        async fn run_two_inserts(fast_rate: f32, slow_rate: f32) -> crate::graph::types::Edge {
1405            let sqlite = crate::store::SqliteStore::new(":memory:").await.unwrap();
1406            let pool = sqlite.pool().clone();
1407            let gs = GraphStore::new(pool).with_benna_rates(fast_rate, slow_rate);
1408
1409            let alice_id = gs
1410                .upsert_entity("Alice", "alice", crate::graph::EntityType::Person, None)
1411                .await
1412                .unwrap();
1413            let bob_id = gs
1414                .upsert_entity("Bob", "bob", crate::graph::EntityType::Person, None)
1415                .await
1416                .unwrap();
1417
1418            // Pass 1: INSERT — seeds confidence_fast = confidence_slow = 0.6.
1419            gs.insert_edge_typed(
1420                alice_id.0,
1421                bob_id.0,
1422                "knows",
1423                "Alice knows Bob",
1424                0.6,
1425                None,
1426                EdgeType::Semantic,
1427            )
1428            .await
1429            .unwrap();
1430
1431            // Pass 2: UPDATE — triggers Benna-Fusi merge with incoming confidence = 0.8.
1432            gs.insert_edge_typed(
1433                alice_id.0,
1434                bob_id.0,
1435                "knows",
1436                "Alice knows Bob",
1437                0.8,
1438                None,
1439                EdgeType::Semantic,
1440            )
1441            .await
1442            .unwrap();
1443
1444            let mut edges = gs.edges_exact(alice_id.0, bob_id.0).await.unwrap();
1445            assert_eq!(edges.len(), 1, "exactly one active edge expected");
1446            edges.remove(0)
1447        }
1448
1449        let default_edge = run_two_inserts(0.5, 0.05).await;
1450        let custom_edge = run_two_inserts(0.1, 0.02).await;
1451
1452        // Different rates → different fast/slow.  Before the fix extract_and_store ignored the
1453        // config fields; all edges would have been identical regardless of configured rates.
1454        assert!(
1455            (default_edge.confidence_fast - custom_edge.confidence_fast).abs() > f32::EPSILON,
1456            "confidence_fast must differ between default (0.5) and custom (0.1) benna_fast_rate (#4711)"
1457        );
1458        assert!(
1459            (default_edge.confidence_slow - custom_edge.confidence_slow).abs() > f32::EPSILON,
1460            "confidence_slow must differ between default (0.05) and custom (0.02) benna_slow_rate (#4711)"
1461        );
1462        // Higher fast_rate → fast variable grows more aggressively: 0.7 (default) > 0.62 (custom).
1463        assert!(
1464            default_edge.confidence_fast > custom_edge.confidence_fast,
1465            "higher benna_fast_rate must produce a larger confidence_fast after merge"
1466        );
1467    }
1468
1469    /// Regression test for #4723: `extract_and_store` must forward `ExtractedEdge.confidence`
1470    /// to the graph resolver instead of always using the hardcoded fallback 0.8.
1471    ///
1472    /// The LLM JSON sets `confidence: 0.3`. Before the fix, line 628 passed `0.8` unconditionally;
1473    /// after the fix it passes `edge.confidence.unwrap_or(0.8)` which is `0.3` when present.
1474    /// A freshly-inserted edge sets `confidence_fast = confidence`, so we compare against 0.3.
1475    #[tokio::test]
1476    async fn extract_and_store_forwards_edge_confidence_not_hardcoded_08() {
1477        use crate::graph::{EntityType, GraphStore};
1478
1479        let sqlite = crate::store::SqliteStore::new(":memory:").await.unwrap();
1480        let pool = sqlite.pool().clone();
1481
1482        // confidence = 0.3 is far enough from 0.8 that float imprecision cannot mask the bug.
1483        let extraction_json = r#"{
1484            "entities":[
1485                {"name":"Alice","type":"person","summary":"person"},
1486                {"name":"Bob","type":"person","summary":"person"}
1487            ],
1488            "edges":[{"source":"Alice","target":"Bob","relation":"knows","fact":"Alice knows Bob","edge_type":"semantic","confidence":0.3}]
1489        }"#;
1490        let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1491        let provider = AnyProvider::Mock(mock);
1492        let config = GraphExtractionConfig {
1493            max_entities: 10,
1494            max_edges: 10,
1495            extraction_timeout_secs: 10,
1496            ..Default::default()
1497        };
1498
1499        let result = extract_and_store(
1500            "Alice knows Bob.".to_owned(),
1501            vec![],
1502            provider,
1503            pool.clone(),
1504            config,
1505            None,
1506            None,
1507        )
1508        .await
1509        .unwrap();
1510
1511        assert_eq!(result.stats.edges_inserted, 1, "one edge must be inserted");
1512
1513        let gs = GraphStore::new(pool);
1514        let alice_id: i64 = gs
1515            .find_entity("alice", EntityType::Person)
1516            .await
1517            .unwrap()
1518            .expect("alice must exist")
1519            .id
1520            .0;
1521        let bob_id: i64 = gs
1522            .find_entity("bob", EntityType::Person)
1523            .await
1524            .unwrap()
1525            .expect("bob must exist")
1526            .id
1527            .0;
1528
1529        let mut edges = gs.edges_exact(alice_id, bob_id).await.unwrap();
1530        assert_eq!(edges.len(), 1, "exactly one active edge expected");
1531        let edge = edges.remove(0);
1532
1533        // Before fix: confidence_fast would be ~0.8 (hardcoded); after fix: ~0.3 (from JSON).
1534        assert!(
1535            (edge.confidence_fast - 0.3_f32).abs() < 0.01,
1536            "confidence_fast must be ~0.3 (from ExtractedEdge.confidence), got {} (regression for #4723)",
1537            edge.confidence_fast
1538        );
1539    }
1540
1541    /// Regression for #4723 (APEX-MEM path): `extract_and_store` must forward
1542    /// `ExtractedEdge.confidence` to `insert_or_supersede_with_turn_index_and_metrics` instead
1543    /// of using the hardcoded literal `0.8`.
1544    #[tokio::test]
1545    async fn extract_and_store_apex_forwards_edge_confidence_not_hardcoded_08() {
1546        use crate::graph::{EntityType, GraphStore};
1547
1548        let sqlite = crate::store::SqliteStore::new(":memory:").await.unwrap();
1549        let pool = sqlite.pool().clone();
1550
1551        // confidence = 0.3 is far enough from 0.8 that float imprecision cannot mask the bug.
1552        let extraction_json = r#"{
1553            "entities":[
1554                {"name":"Alice","type":"person","summary":"person"},
1555                {"name":"Bob","type":"person","summary":"person"}
1556            ],
1557            "edges":[{"source":"Alice","target":"Bob","relation":"knows","fact":"Alice knows Bob","edge_type":"semantic","confidence":0.3}]
1558        }"#;
1559        let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1560        let provider = AnyProvider::Mock(mock);
1561        let config = GraphExtractionConfig {
1562            max_entities: 10,
1563            max_edges: 10,
1564            extraction_timeout_secs: 10,
1565            apex_mem_enabled: true,
1566            ..Default::default()
1567        };
1568
1569        let result = extract_and_store(
1570            "Alice knows Bob.".to_owned(),
1571            vec![],
1572            provider,
1573            pool.clone(),
1574            config,
1575            None,
1576            None,
1577        )
1578        .await
1579        .unwrap();
1580
1581        assert_eq!(result.stats.edges_inserted, 1, "one edge must be inserted");
1582
1583        let gs = GraphStore::new(pool);
1584        let alice_id: i64 = gs
1585            .find_entity("alice", EntityType::Person)
1586            .await
1587            .unwrap()
1588            .expect("alice must exist")
1589            .id
1590            .0;
1591        let bob_id: i64 = gs
1592            .find_entity("bob", EntityType::Person)
1593            .await
1594            .unwrap()
1595            .expect("bob must exist")
1596            .id
1597            .0;
1598
1599        let mut edges = gs.edges_exact(alice_id, bob_id).await.unwrap();
1600        assert_eq!(edges.len(), 1, "exactly one active edge expected");
1601        let edge = edges.remove(0);
1602
1603        // Before fix: confidence_fast would be ~0.8 (hardcoded); after fix: ~0.3 (from JSON).
1604        assert!(
1605            (edge.confidence_fast - 0.3_f32).abs() < 0.01,
1606            "confidence_fast must be ~0.3 (from ExtractedEdge.confidence), got {} (regression for #4723 APEX path)",
1607            edge.confidence_fast
1608        );
1609    }
1610
1611    /// Verify `GraphExtractionConfig` default benna rates match `GraphStore::new` defaults.
1612    ///
1613    /// If either default is changed in one place but not the other, behavior silently diverges
1614    /// between paths that call `with_benna_rates` and paths that don't.
1615    #[test]
1616    fn graph_extraction_config_benna_defaults_match_graph_store_defaults() {
1617        let cfg = GraphExtractionConfig::default();
1618        // GraphStore::new uses 0.5 / 0.05 — these must stay in sync.
1619        assert!(
1620            (cfg.benna_fast_rate - 0.5_f32).abs() < f32::EPSILON,
1621            "benna_fast_rate default must match GraphStore::new default of 0.5"
1622        );
1623        assert!(
1624            (cfg.benna_slow_rate - 0.05_f32).abs() < f32::EPSILON,
1625            "benna_slow_rate default must match GraphStore::new default of 0.05"
1626        );
1627    }
1628}