Skip to main content

zeph_memory/semantic/
graph.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::sync::Arc;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use std::sync::atomic::Ordering;
9use zeph_db::DbPool;
10
11pub use zeph_common::config::memory::NoteLinkingConfig;
12use zeph_common::sanitize::strip_control_chars;
13use zeph_common::text::truncate_to_bytes_ref;
14use zeph_llm::any::AnyProvider;
15use zeph_llm::provider::LlmProvider as _;
16
17use crate::embedding_store::EmbeddingStore;
18use crate::error::MemoryError;
19use crate::graph::extractor::ExtractionResult as ExtractorResult;
20use crate::vector_store::VectorFilter;
21
22use super::SemanticMemory;
23
24/// Callback type for post-extraction validation.
25///
26/// A generic predicate opaque to zeph-memory — callers (zeph-core) provide security
27/// validation without introducing a dependency on security policy in this crate.
28pub type PostExtractValidator = Option<Box<dyn Fn(&ExtractorResult) -> Result<(), String> + Send>>;
29
30/// Config for the spawned background extraction task.
31///
32/// Owned clone of the relevant fields from `GraphConfig` — no references, safe to send to
33/// spawned tasks.
34#[derive(Debug, Clone)]
35pub struct GraphExtractionConfig {
36    pub max_entities: usize,
37    pub max_edges: usize,
38    pub extraction_timeout_secs: u64,
39    pub community_refresh_interval: usize,
40    pub expired_edge_retention_days: u32,
41    pub max_entities_cap: usize,
42    pub community_summary_max_prompt_bytes: usize,
43    pub community_summary_concurrency: usize,
44    pub lpa_edge_chunk_size: usize,
45    /// A-MEM note linking config, cloned from `GraphConfig.note_linking`.
46    pub note_linking: NoteLinkingConfig,
47    /// A-MEM link weight decay lambda. Range: `(0.0, 1.0]`. Default: `0.95`.
48    pub link_weight_decay_lambda: f64,
49    /// Seconds between link weight decay passes. Default: `86400`.
50    pub link_weight_decay_interval_secs: u64,
51    /// Kumiho belief revision: enable semantic contradiction detection for edges.
52    pub belief_revision_enabled: bool,
53    /// Cosine similarity threshold for belief revision contradiction detection.
54    pub belief_revision_similarity_threshold: f32,
55    /// GAAMA episode linking: `conversation_id` to link extracted entities to their episode.
56    /// `None` disables episode linking for this extraction pass.
57    pub conversation_id: Option<i64>,
58    /// APEX-MEM: use `insert_or_supersede` instead of `resolve_edge_typed`. Default: `false`.
59    pub apex_mem_enabled: bool,
60    /// LLM call timeout for extraction, in seconds. Default: `30`.
61    pub llm_timeout_secs: u64,
62}
63
64impl Default for GraphExtractionConfig {
65    fn default() -> Self {
66        Self {
67            max_entities: 0,
68            max_edges: 0,
69            extraction_timeout_secs: 0,
70            community_refresh_interval: 0,
71            expired_edge_retention_days: 0,
72            max_entities_cap: 0,
73            community_summary_max_prompt_bytes: 0,
74            community_summary_concurrency: 0,
75            lpa_edge_chunk_size: 0,
76            note_linking: NoteLinkingConfig::default(),
77            link_weight_decay_lambda: 0.95,
78            link_weight_decay_interval_secs: 86400,
79            belief_revision_enabled: false,
80            belief_revision_similarity_threshold: 0.85,
81            conversation_id: None,
82            apex_mem_enabled: false,
83            llm_timeout_secs: 30,
84        }
85    }
86}
87
88/// Stats returned from a completed extraction.
89#[derive(Debug, Default)]
90pub struct ExtractionStats {
91    pub entities_upserted: usize,
92    pub edges_inserted: usize,
93}
94
95/// Result returned from `extract_and_store`, combining stats with entity IDs needed for linking.
96#[derive(Debug, Default)]
97pub struct ExtractionResult {
98    pub stats: ExtractionStats,
99    /// IDs of entities upserted during this extraction pass. Passed to `link_memory_notes`.
100    pub entity_ids: Vec<i64>,
101}
102
103/// Stats returned from a completed note-linking pass.
104#[derive(Debug, Default)]
105pub struct LinkingStats {
106    pub entities_processed: usize,
107    pub edges_created: usize,
108}
109
110/// Qdrant collection name for entity embeddings (mirrors the constant in `resolver.rs`).
111const ENTITY_COLLECTION: &str = "zeph_graph_entities";
112
113/// Mirrors the constant from `graph/resolver/mod.rs` — used for sanitizing APEX-MEM inputs.
114const MAX_RELATION_BYTES: usize = 256;
115/// Mirrors the constant from `graph/resolver/mod.rs` — used for sanitizing APEX-MEM inputs.
116const MAX_FACT_BYTES: usize = 2048;
117
118/// Work item for a single entity during a note-linking pass.
119struct EntityWorkItem {
120    entity_id: i64,
121    canonical_name: String,
122    embed_text: String,
123    self_point_id: Option<String>,
124}
125
126/// Link newly extracted entities to semantically similar entities in the graph.
127///
128/// For each entity in `entity_ids`:
129/// 1. Load the entity name + summary from `SQLite`.
130/// 2. Embed all entity texts in parallel.
131/// 3. Search the entity embedding collection in parallel for the `top_k + 1` most similar points.
132/// 4. Filter out the entity itself (by `qdrant_point_id` or `entity_id` payload) and points
133///    below `similarity_threshold`.
134/// 5. Insert a unidirectional `similar_to` edge where `source_id < target_id` to avoid
135///    double-counting in BFS recall while still being traversable via the OR clause in
136///    `edges_for_entity`. The edge confidence is set to the cosine similarity score.
137/// 6. Deduplicate pairs within a single pass so that a pair encountered from both A→B and B→A
138///    directions is only inserted once, keeping `edges_created` accurate.
139///
140/// Errors are logged and not propagated — this is a best-effort background enrichment step.
141pub async fn link_memory_notes(
142    entity_ids: &[i64],
143    pool: DbPool,
144    embedding_store: Arc<EmbeddingStore>,
145    provider: AnyProvider,
146    cfg: &NoteLinkingConfig,
147) -> LinkingStats {
148    use crate::graph::GraphStore;
149
150    let store = GraphStore::new(pool);
151    let mut stats = LinkingStats::default();
152
153    let work_items = collect_note_link_work_items(entity_ids, &store).await;
154    if work_items.is_empty() {
155        return stats;
156    }
157
158    let valid = embed_work_items(&work_items, &provider, cfg).await;
159
160    let search_limit = cfg.top_k + 1; // +1 to account for self-match
161    let search_results = search_similar_for_items(&valid, &embedding_store, search_limit).await;
162
163    insert_similarity_edges(
164        &work_items,
165        &valid,
166        &search_results,
167        cfg,
168        &store,
169        &mut stats,
170    )
171    .await;
172
173    stats
174}
175
176/// Phase 1: load entities from the DB and build work items for embedding.
177///
178/// Processes entities sequentially to avoid connection-pool contention.
179async fn collect_note_link_work_items(
180    entity_ids: &[i64],
181    store: &crate::graph::GraphStore,
182) -> Vec<EntityWorkItem> {
183    let mut work_items: Vec<EntityWorkItem> = Vec::with_capacity(entity_ids.len());
184    for &entity_id in entity_ids {
185        let entity = match store.find_entity_by_id(entity_id).await {
186            Ok(Some(e)) => e,
187            Ok(None) => {
188                tracing::debug!("note_linking: entity {entity_id} not found, skipping");
189                continue;
190            }
191            Err(e) => {
192                tracing::debug!("note_linking: DB error loading entity {entity_id}: {e:#}");
193                continue;
194            }
195        };
196        let embed_text = match &entity.summary {
197            Some(s) if !s.is_empty() => format!("{}: {s}", entity.canonical_name),
198            _ => entity.canonical_name.clone(),
199        };
200        work_items.push(EntityWorkItem {
201            entity_id,
202            canonical_name: entity.canonical_name,
203            embed_text,
204            self_point_id: entity.qdrant_point_id,
205        });
206    }
207    work_items
208}
209
210/// Phase 2: embed all entity texts in parallel.
211///
212/// Returns `(work_idx, embedding)` pairs for successfully embedded items.
213/// Items that fail to embed are logged and dropped.
214async fn embed_work_items(
215    work_items: &[EntityWorkItem],
216    provider: &AnyProvider,
217    cfg: &NoteLinkingConfig,
218) -> Vec<(usize, Vec<f32>)> {
219    use futures::future;
220
221    let Ok(embed_results) = tokio::time::timeout(
222        std::time::Duration::from_secs(cfg.timeout_secs),
223        future::join_all(work_items.iter().map(|w| provider.embed(&w.embed_text))),
224    )
225    .await
226    else {
227        tracing::warn!(
228            count = work_items.len(),
229            "note_linking: batch embed timed out — skipping all entities"
230        );
231        return Vec::new();
232    };
233    embed_results
234        .into_iter()
235        .enumerate()
236        .filter_map(|(i, r)| match r {
237            Ok(v) => Some((i, v)),
238            Err(e) => {
239                tracing::debug!(
240                    "note_linking: embed failed for entity {:?}: {e:#}",
241                    work_items[i].canonical_name
242                );
243                None
244            }
245        })
246        .collect()
247}
248
249/// Phase 3: search the embedding store for similar entities for each embedded work item.
250async fn search_similar_for_items(
251    valid: &[(usize, Vec<f32>)],
252    embedding_store: &EmbeddingStore,
253    search_limit: usize,
254) -> Vec<Result<Vec<crate::ScoredVectorPoint>, MemoryError>> {
255    use futures::future;
256
257    future::join_all(valid.iter().map(|(_, vec)| {
258        embedding_store.search_collection(
259            ENTITY_COLLECTION,
260            vec,
261            search_limit,
262            None::<VectorFilter>,
263        )
264    }))
265    .await
266}
267
268/// Phase 4: insert similarity edges, deduplicating pairs seen from both A→B and B→A.
269///
270/// Without deduplication, both directions would call `insert_edge` for the same normalised
271/// pair and both return `Ok`, inflating `edges_created` by the number of bidirectional hits.
272async fn insert_similarity_edges(
273    work_items: &[EntityWorkItem],
274    valid: &[(usize, Vec<f32>)],
275    search_results: &[Result<Vec<crate::ScoredVectorPoint>, MemoryError>],
276    cfg: &NoteLinkingConfig,
277    store: &crate::graph::GraphStore,
278    stats: &mut LinkingStats,
279) {
280    let mut seen_pairs = std::collections::HashSet::new();
281
282    for ((work_idx, _), search_result) in valid.iter().zip(search_results.iter()) {
283        let w = &work_items[*work_idx];
284
285        let results = match search_result {
286            Ok(r) => r,
287            Err(e) => {
288                tracing::debug!(
289                    "note_linking: search failed for entity {:?}: {e:#}",
290                    w.canonical_name
291                );
292                continue;
293            }
294        };
295
296        stats.entities_processed += 1;
297
298        let self_point_id = w.self_point_id.as_deref();
299        let candidates = results
300            .iter()
301            .filter(|p| Some(p.id.as_str()) != self_point_id && p.score >= cfg.similarity_threshold)
302            .take(cfg.top_k);
303
304        for point in candidates {
305            let Some(target_id) = point
306                .payload
307                .get("entity_id")
308                .and_then(serde_json::Value::as_i64)
309            else {
310                tracing::debug!(
311                    "note_linking: missing entity_id in payload for point {}",
312                    point.id
313                );
314                continue;
315            };
316
317            if target_id == w.entity_id {
318                continue; // secondary self-guard when qdrant_point_id is null
319            }
320
321            // Normalise direction: always store source_id < target_id.
322            let (src, tgt) = if w.entity_id < target_id {
323                (w.entity_id, target_id)
324            } else {
325                (target_id, w.entity_id)
326            };
327
328            if !seen_pairs.insert((src, tgt)) {
329                continue;
330            }
331
332            let fact = format!("Semantically similar entities (score: {:.3})", point.score);
333
334            match store
335                .insert_edge(src, tgt, "similar_to", &fact, point.score, None)
336                .await
337            {
338                Ok(_) => stats.edges_created += 1,
339                Err(e) => {
340                    tracing::debug!("note_linking: insert_edge failed: {e:#}");
341                }
342            }
343        }
344    }
345}
346
347/// Extract entities and edges from `content` and persist them to the graph store.
348///
349/// This function runs inside a spawned task — it receives owned data only.
350///
351/// The optional `embedding_store` enables entity embedding storage in Qdrant, which is
352/// required for A-MEM note linking to find semantically similar entities across sessions.
353///
354/// # Errors
355///
356/// Returns an error if the database query fails or LLM extraction fails.
357#[cfg_attr(
358    feature = "profiling",
359    tracing::instrument(name = "memory.graph_extract", skip_all, fields(entities = tracing::field::Empty, edges = tracing::field::Empty))
360)]
361pub async fn extract_and_store(
362    content: String,
363    context_messages: Vec<String>,
364    provider: AnyProvider,
365    pool: DbPool,
366    config: GraphExtractionConfig,
367    post_extract_validator: PostExtractValidator,
368    embedding_store: Option<Arc<EmbeddingStore>>,
369) -> Result<ExtractionResult, MemoryError> {
370    use crate::graph::{EntityResolver, GraphExtractor, GraphStore};
371
372    let extractor = GraphExtractor::new(
373        provider.clone(),
374        config.max_entities,
375        config.max_edges,
376        config.llm_timeout_secs,
377    );
378    let ctx_refs: Vec<&str> = context_messages.iter().map(String::as_str).collect();
379
380    let store = GraphStore::new(pool);
381
382    bump_extraction_count(store.pool()).await?;
383
384    let Some(result) = extractor.extract(&content, &ctx_refs).await? else {
385        return Ok(ExtractionResult::default());
386    };
387
388    // Post-extraction validation callback. zeph-memory does not know the callback is a
389    // security validator — it is a generic predicate opaque to this crate (design decision D1).
390    if let Some(ref validator) = post_extract_validator
391        && let Err(reason) = validator(&result)
392    {
393        tracing::warn!(
394            reason,
395            "graph extraction validation failed, skipping upsert"
396        );
397        return Ok(ExtractionResult::default());
398    }
399
400    let resolver = if let Some(ref emb) = embedding_store {
401        EntityResolver::new(&store)
402            .with_embedding_store(emb)
403            .with_provider(&provider)
404    } else {
405        EntityResolver::new(&store)
406    };
407
408    let (entity_name_to_id, entities_upserted) = upsert_entities(&resolver, &result.entities).await;
409    let edges_inserted = insert_edges(&resolver, &result.edges, &entity_name_to_id, &config).await;
410
411    #[cfg(any(feature = "sqlite", feature = "postgres"))]
412    store.checkpoint_wal().await?;
413
414    let new_entity_ids: Vec<i64> = entity_name_to_id.into_values().collect();
415
416    link_episode(&store, &config, &new_entity_ids).await;
417
418    #[cfg(feature = "profiling")]
419    {
420        let span = tracing::Span::current();
421        span.record("entities", entities_upserted);
422        span.record("edges", edges_inserted);
423    }
424
425    Ok(ExtractionResult {
426        stats: ExtractionStats {
427            entities_upserted,
428            edges_inserted,
429        },
430        entity_ids: new_entity_ids,
431    })
432}
433
434/// Increment the extraction counter in `graph_metadata`.
435async fn bump_extraction_count(pool: &DbPool) -> Result<(), MemoryError> {
436    zeph_db::query(sql!(
437        "INSERT INTO graph_metadata (key, value) VALUES ('extraction_count', '0')
438         ON CONFLICT(key) DO NOTHING"
439    ))
440    .execute(pool)
441    .await?;
442    zeph_db::query(sql!(
443        "UPDATE graph_metadata
444         SET value = CAST(CAST(value AS INTEGER) + 1 AS TEXT)
445         WHERE key = 'extraction_count'"
446    ))
447    .execute(pool)
448    .await?;
449    Ok(())
450}
451
452/// Upsert all extracted entities and return the name-to-id map and upsert count.
453async fn upsert_entities(
454    resolver: &crate::graph::EntityResolver<'_>,
455    entities: &[crate::graph::extractor::ExtractedEntity],
456) -> (std::collections::HashMap<String, i64>, usize) {
457    let mut entity_name_to_id: std::collections::HashMap<String, i64> =
458        std::collections::HashMap::new();
459    let mut entities_upserted = 0usize;
460
461    for entity in entities {
462        match resolver
463            .resolve(&entity.name, &entity.entity_type, entity.summary.as_deref())
464            .await
465        {
466            Ok((id, _outcome)) => {
467                entity_name_to_id.insert(entity.name.clone(), id);
468                entities_upserted += 1;
469            }
470            Err(e) => {
471                tracing::debug!("graph: skipping entity {:?}: {e:#}", entity.name);
472            }
473        }
474    }
475
476    (entity_name_to_id, entities_upserted)
477}
478
479/// Insert extracted edges that have both endpoints in `name_to_id`.
480///
481/// Returns the number of edges actually inserted.
482async fn insert_edges(
483    resolver: &crate::graph::EntityResolver<'_>,
484    edges: &[crate::graph::extractor::ExtractedEdge],
485    name_to_id: &std::collections::HashMap<String, i64>,
486    config: &GraphExtractionConfig,
487) -> usize {
488    let mut edges_inserted = 0usize;
489    for edge in edges {
490        let (Some(&src_id), Some(&tgt_id)) =
491            (name_to_id.get(&edge.source), name_to_id.get(&edge.target))
492        else {
493            tracing::debug!(
494                "graph: skipping edge {:?}->{:?}: entity not resolved",
495                edge.source,
496                edge.target
497            );
498            continue;
499        };
500        if src_id == tgt_id {
501            tracing::debug!(
502                "graph: skipping self-loop edge {:?}->{:?} (entity_id={src_id})",
503                edge.source,
504                edge.target
505            );
506            continue;
507        }
508        // Parse LLM-provided edge_type; default to Semantic on any parse failure so
509        // edges are never dropped due to classification errors.
510        let edge_type = edge
511            .edge_type
512            .parse::<crate::graph::EdgeType>()
513            .unwrap_or_else(|_| {
514                tracing::warn!(
515                    raw_type = %edge.edge_type,
516                    "graph: unknown edge_type from LLM, defaulting to semantic"
517                );
518                crate::graph::EdgeType::Semantic
519            });
520        if config.apex_mem_enabled {
521            // APEX-MEM: append-only write path with supersession chains.
522            let relation_trimmed = edge.relation.trim();
523            let relation_display_clean = strip_control_chars(relation_trimmed);
524            let relation_display =
525                truncate_to_bytes_ref(&relation_display_clean, MAX_RELATION_BYTES).to_owned();
526            let canonical_clean = strip_control_chars(&relation_trimmed.to_lowercase());
527            let canonical_relation =
528                truncate_to_bytes_ref(&canonical_clean, MAX_RELATION_BYTES).to_owned();
529            let fact_clean = strip_control_chars(edge.fact.trim());
530            let normalized_fact = truncate_to_bytes_ref(&fact_clean, MAX_FACT_BYTES).to_owned();
531            match resolver
532                .graph_store()
533                .insert_or_supersede(
534                    src_id,
535                    tgt_id,
536                    &relation_display,
537                    &canonical_relation,
538                    &normalized_fact,
539                    0.8,
540                    None,
541                    edge_type,
542                    true,
543                )
544                .await
545            {
546                Ok(_) => edges_inserted += 1,
547                Err(e) => {
548                    tracing::debug!("graph: skipping edge (apex): {e:#}");
549                }
550            }
551        } else {
552            let belief_cfg =
553                config
554                    .belief_revision_enabled
555                    .then_some(crate::graph::BeliefRevisionConfig {
556                        similarity_threshold: config.belief_revision_similarity_threshold,
557                    });
558            match resolver
559                .resolve_edge_typed(
560                    src_id,
561                    tgt_id,
562                    &edge.relation,
563                    &edge.fact,
564                    0.8,
565                    None,
566                    edge_type,
567                    belief_cfg.as_ref(),
568                )
569                .await
570            {
571                Ok(Some(_)) => edges_inserted += 1,
572                Ok(None) => {} // deduplicated
573                Err(e) => {
574                    tracing::debug!("graph: skipping edge: {e:#}");
575                }
576            }
577        }
578    }
579    edges_inserted
580}
581
582/// Link extracted entities to their GAAMA episode when a conversation ID is configured.
583async fn link_episode(
584    store: &crate::graph::GraphStore,
585    config: &GraphExtractionConfig,
586    entity_ids: &[i64],
587) {
588    let Some(conv_id) = config.conversation_id else {
589        return;
590    };
591    match store.ensure_episode(conv_id).await {
592        Ok(episode_id) => {
593            for &entity_id in entity_ids {
594                if let Err(e) = store.link_entity_to_episode(episode_id, entity_id).await {
595                    tracing::debug!("episode linking skipped for entity {entity_id}: {e:#}");
596                }
597            }
598        }
599        Err(e) => {
600            tracing::warn!("failed to ensure episode for conversation {conv_id}: {e:#}");
601        }
602    }
603}
604
605impl SemanticMemory {
606    /// Spawn background graph extraction for a message. Fire-and-forget — never blocks.
607    ///
608    /// Extraction runs in a separate tokio task with a timeout. Any error or timeout is
609    /// logged and the task exits silently; the agent response is never blocked.
610    ///
611    /// The optional `post_extract_validator` is called after extraction, before upsert.
612    /// It is a generic predicate opaque to zeph-memory (design decision D1).
613    ///
614    /// When `config.note_linking.enabled` is `true` and an embedding store is available,
615    /// `link_memory_notes` runs after successful extraction inside the same task, bounded
616    /// by `config.note_linking.timeout_secs`.
617    pub fn spawn_graph_extraction(
618        &self,
619        content: String,
620        context_messages: Vec<String>,
621        config: GraphExtractionConfig,
622        post_extract_validator: PostExtractValidator,
623        provider_override: Option<AnyProvider>,
624    ) -> tokio::task::JoinHandle<()> {
625        let using_override = provider_override.is_some();
626        let provider = provider_override.unwrap_or_else(|| self.provider.clone());
627        if using_override {
628            tracing::debug!(
629                extract_provider = provider.name(),
630                "graph extraction using override provider (quality_gate bypassed)"
631            );
632        }
633        let ctx = GraphExtractionTaskCtx {
634            pool: self.sqlite.pool().clone(),
635            provider,
636            failure_counter: self.community_detection_failures.clone(),
637            extraction_count: self.graph_extraction_count.clone(),
638            extraction_failures: self.graph_extraction_failures.clone(),
639            embedding_store: self.qdrant.clone(),
640        };
641
642        tokio::spawn(run_graph_extraction_task(
643            content,
644            context_messages,
645            config,
646            post_extract_validator,
647            ctx,
648        ))
649    }
650}
651
652/// Owned context bundled for the spawned extraction task.
653///
654/// Bundles the Arcs that must be cloned before entering `tokio::spawn`.
655struct GraphExtractionTaskCtx {
656    pool: DbPool,
657    provider: AnyProvider,
658    failure_counter: Arc<std::sync::atomic::AtomicU64>,
659    extraction_count: Arc<std::sync::atomic::AtomicU64>,
660    extraction_failures: Arc<std::sync::atomic::AtomicU64>,
661    embedding_store: Option<Arc<EmbeddingStore>>,
662}
663
664/// Body of the spawned graph-extraction task.
665async fn run_graph_extraction_task(
666    content: String,
667    context_messages: Vec<String>,
668    config: GraphExtractionConfig,
669    post_extract_validator: PostExtractValidator,
670    ctx: GraphExtractionTaskCtx,
671) {
672    let timeout_dur = std::time::Duration::from_secs(config.extraction_timeout_secs);
673    let extraction_result = tokio::time::timeout(
674        timeout_dur,
675        extract_and_store(
676            content,
677            context_messages,
678            ctx.provider.clone(),
679            ctx.pool.clone(),
680            config.clone(),
681            post_extract_validator,
682            ctx.embedding_store.clone(),
683        ),
684    )
685    .await;
686
687    let (extraction_ok, new_entity_ids) = match extraction_result {
688        Ok(Ok(result)) => {
689            tracing::debug!(
690                entities = result.stats.entities_upserted,
691                edges = result.stats.edges_inserted,
692                "graph extraction completed"
693            );
694            ctx.extraction_count.fetch_add(1, Ordering::Relaxed);
695            (true, result.entity_ids)
696        }
697        Ok(Err(e)) => {
698            tracing::warn!("graph extraction failed: {e:#}");
699            ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
700            (false, vec![])
701        }
702        Err(_elapsed) => {
703            tracing::warn!("graph extraction timed out");
704            ctx.extraction_failures.fetch_add(1, Ordering::Relaxed);
705            (false, vec![])
706        }
707    };
708
709    run_note_linking(
710        extraction_ok,
711        &new_entity_ids,
712        ctx.pool.clone(),
713        ctx.embedding_store,
714        ctx.provider.clone(),
715        &config,
716    )
717    .await;
718
719    maybe_refresh_communities(
720        extraction_ok,
721        ctx.pool,
722        ctx.provider,
723        ctx.failure_counter,
724        &config,
725    )
726    .await;
727}
728
729/// Run A-MEM note linking after successful extraction when enabled.
730async fn run_note_linking(
731    extraction_ok: bool,
732    new_entity_ids: &[i64],
733    pool: DbPool,
734    embedding_store: Option<Arc<EmbeddingStore>>,
735    provider: AnyProvider,
736    config: &GraphExtractionConfig,
737) {
738    if !extraction_ok || !config.note_linking.enabled || new_entity_ids.is_empty() {
739        return;
740    }
741    let Some(store) = embedding_store else {
742        return;
743    };
744    let linking_timeout = std::time::Duration::from_secs(config.note_linking.timeout_secs);
745    match tokio::time::timeout(
746        linking_timeout,
747        link_memory_notes(new_entity_ids, pool, store, provider, &config.note_linking),
748    )
749    .await
750    {
751        Ok(stats) => {
752            tracing::debug!(
753                entities_processed = stats.entities_processed,
754                edges_created = stats.edges_created,
755                "note linking completed"
756            );
757        }
758        Err(_elapsed) => {
759            tracing::debug!("note linking timed out (partial edges may exist)");
760        }
761    }
762}
763
764/// Trigger community detection, graph eviction, and link-weight decay when the extraction
765/// count hits the configured refresh interval.
766async fn maybe_refresh_communities(
767    extraction_ok: bool,
768    pool: DbPool,
769    provider: AnyProvider,
770    failure_counter: Arc<std::sync::atomic::AtomicU64>,
771    config: &GraphExtractionConfig,
772) {
773    use crate::graph::GraphStore;
774
775    if !extraction_ok || config.community_refresh_interval == 0 {
776        return;
777    }
778
779    let store = GraphStore::new(pool.clone());
780    let extraction_count = store.extraction_count().await.unwrap_or(0);
781    if extraction_count == 0
782        || !i64::try_from(config.community_refresh_interval)
783            .is_ok_and(|interval| extraction_count % interval == 0)
784    {
785        return;
786    }
787
788    tracing::info!(extraction_count, "triggering community detection refresh");
789    let store2 = GraphStore::new(pool);
790    let provider2 = provider;
791    let retention_days = config.expired_edge_retention_days;
792    let max_cap = config.max_entities_cap;
793    let max_prompt_bytes = config.community_summary_max_prompt_bytes;
794    let concurrency = config.community_summary_concurrency;
795    let edge_chunk_size = config.lpa_edge_chunk_size;
796    let decay_lambda = config.link_weight_decay_lambda;
797    let decay_interval_secs = config.link_weight_decay_interval_secs;
798    tokio::spawn(async move {
799        match crate::graph::community::detect_communities(
800            &store2,
801            &provider2,
802            max_prompt_bytes,
803            concurrency,
804            edge_chunk_size,
805        )
806        .await
807        {
808            Ok(count) => {
809                tracing::info!(communities = count, "community detection complete");
810            }
811            Err(e) => {
812                tracing::warn!("community detection failed: {e:#}");
813                failure_counter.fetch_add(1, Ordering::Relaxed);
814            }
815        }
816        match crate::graph::community::run_graph_eviction(&store2, retention_days, max_cap).await {
817            Ok(stats) => {
818                tracing::info!(
819                    expired_edges = stats.expired_edges_deleted,
820                    orphan_entities = stats.orphan_entities_deleted,
821                    capped_entities = stats.capped_entities_deleted,
822                    "graph eviction complete"
823                );
824            }
825            Err(e) => {
826                tracing::warn!("graph eviction failed: {e:#}");
827            }
828        }
829
830        // Time-based link weight decay — independent of eviction cycle.
831        if decay_lambda > 0.0 && decay_interval_secs > 0 {
832            let now_secs = std::time::SystemTime::now()
833                .duration_since(std::time::UNIX_EPOCH)
834                .map_or(0, |d| d.as_secs());
835            let last_decay = store2
836                .get_metadata("last_link_weight_decay_at")
837                .await
838                .ok()
839                .flatten()
840                .and_then(|s| s.parse::<u64>().ok())
841                .unwrap_or(0);
842            if now_secs.saturating_sub(last_decay) >= decay_interval_secs {
843                match store2
844                    .decay_edge_retrieval_counts(decay_lambda, decay_interval_secs)
845                    .await
846                {
847                    Ok(affected) => {
848                        tracing::info!(affected, "link weight decay applied");
849                        let _ = store2
850                            .set_metadata("last_link_weight_decay_at", &now_secs.to_string())
851                            .await;
852                    }
853                    Err(e) => {
854                        tracing::warn!("link weight decay failed: {e:#}");
855                    }
856                }
857            }
858        }
859    });
860}
861
862#[cfg(test)]
863mod tests {
864    use std::sync::Arc;
865
866    use zeph_llm::any::AnyProvider;
867
868    use super::{NoteLinkingConfig, extract_and_store};
869    use crate::embedding_store::EmbeddingStore;
870    use crate::graph::GraphStore;
871    use crate::in_memory_store::InMemoryVectorStore;
872    use crate::store::SqliteStore;
873
874    use super::GraphExtractionConfig;
875
876    async fn setup() -> (GraphStore, Arc<EmbeddingStore>) {
877        let sqlite = SqliteStore::new(":memory:").await.unwrap();
878        let pool = sqlite.pool().clone();
879        let mem_store = Box::new(InMemoryVectorStore::new());
880        let emb = Arc::new(EmbeddingStore::with_store(mem_store, pool.clone()));
881        let gs = GraphStore::new(pool);
882        (gs, emb)
883    }
884
885    /// Regression test for #1829: `extract_and_store()` must pass the provider to `EntityResolver`
886    /// so that `store_entity_embedding()` is called and `qdrant_point_id` is set in `SQLite`.
887    #[tokio::test]
888    async fn extract_and_store_sets_qdrant_point_id_when_embedding_store_provided() {
889        let (gs, emb) = setup().await;
890
891        // MockProvider: supports embeddings, returns a valid extraction JSON for chat
892        let extraction_json = r#"{"entities":[{"name":"Rust","type":"language","summary":"systems language"}],"edges":[]}"#;
893        let mut mock =
894            zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
895        mock.supports_embeddings = true;
896        mock.embedding = vec![1.0_f32, 0.0, 0.0, 0.0];
897        let provider = AnyProvider::Mock(mock);
898
899        let config = GraphExtractionConfig {
900            max_entities: 10,
901            max_edges: 10,
902            extraction_timeout_secs: 10,
903            ..Default::default()
904        };
905
906        let result = extract_and_store(
907            "Rust is a systems programming language.".to_owned(),
908            vec![],
909            provider,
910            gs.pool().clone(),
911            config,
912            None,
913            Some(emb.clone()),
914        )
915        .await
916        .unwrap();
917
918        assert_eq!(
919            result.stats.entities_upserted, 1,
920            "one entity should be upserted"
921        );
922
923        // The entity must have a qdrant_point_id — this proves store_entity_embedding() was called.
924        // Before the fix, EntityResolver was built without a provider, so embed() was never called
925        // and qdrant_point_id remained NULL.
926        let entity = gs
927            .find_entity("rust", crate::graph::EntityType::Language)
928            .await
929            .unwrap()
930            .expect("entity 'rust' must exist in SQLite");
931
932        assert!(
933            entity.qdrant_point_id.is_some(),
934            "qdrant_point_id must be set when embedding_store + provider are both provided (regression for #1829)"
935        );
936    }
937
938    /// When no `embedding_store` is provided, `extract_and_store()` must still work correctly
939    /// (no embeddings stored, but entities are still upserted).
940    #[tokio::test]
941    async fn extract_and_store_without_embedding_store_still_upserts_entities() {
942        let (gs, _emb) = setup().await;
943
944        let extraction_json = r#"{"entities":[{"name":"Python","type":"language","summary":"scripting"}],"edges":[]}"#;
945        let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
946        let provider = AnyProvider::Mock(mock);
947
948        let config = GraphExtractionConfig {
949            max_entities: 10,
950            max_edges: 10,
951            extraction_timeout_secs: 10,
952            ..Default::default()
953        };
954
955        let result = extract_and_store(
956            "Python is a scripting language.".to_owned(),
957            vec![],
958            provider,
959            gs.pool().clone(),
960            config,
961            None,
962            None, // no embedding_store
963        )
964        .await
965        .unwrap();
966
967        assert_eq!(result.stats.entities_upserted, 1);
968
969        let entity = gs
970            .find_entity("python", crate::graph::EntityType::Language)
971            .await
972            .unwrap()
973            .expect("entity 'python' must exist");
974
975        assert!(
976            entity.qdrant_point_id.is_none(),
977            "qdrant_point_id must remain None when no embedding_store is provided"
978        );
979    }
980
981    /// Regression test for #2166: FTS5 entity writes must be visible to a new connection pool
982    /// opened after extraction completes. Without `checkpoint_wal()` in `extract_and_store`,
983    /// a fresh pool sees stale FTS5 shadow tables and `find_entities_fuzzy` returns empty.
984    #[tokio::test]
985    async fn extract_and_store_fts5_cross_session_visibility() {
986        let file = tempfile::NamedTempFile::new().expect("tempfile");
987        let path = file.path().to_str().expect("valid path").to_string();
988
989        // Session A: run extract_and_store on a file DB (not :memory:) so WAL is used.
990        {
991            let sqlite = crate::store::SqliteStore::new(&path).await.unwrap();
992            let extraction_json = r#"{"entities":[{"name":"Ferris","type":"concept","summary":"Rust mascot"}],"edges":[]}"#;
993            let mock =
994                zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
995            let provider = AnyProvider::Mock(mock);
996            let config = GraphExtractionConfig {
997                max_entities: 10,
998                max_edges: 10,
999                extraction_timeout_secs: 10,
1000                ..Default::default()
1001            };
1002            extract_and_store(
1003                "Ferris is the Rust mascot.".to_owned(),
1004                vec![],
1005                provider,
1006                sqlite.pool().clone(),
1007                config,
1008                None,
1009                None,
1010            )
1011            .await
1012            .unwrap();
1013        }
1014
1015        // Session B: new pool — FTS5 must see the entity extracted in session A.
1016        let sqlite_b = crate::store::SqliteStore::new(&path).await.unwrap();
1017        let gs_b = crate::graph::GraphStore::new(sqlite_b.pool().clone());
1018        let results = gs_b.find_entities_fuzzy("Ferris", 10).await.unwrap();
1019        assert!(
1020            !results.is_empty(),
1021            "FTS5 cross-session (#2166): entity extracted in session A must be visible in session B"
1022        );
1023    }
1024
1025    /// Regression test for #2215: self-loop edges (source == target entity) must be silently
1026    /// skipped; no edge row should be inserted.
1027    #[tokio::test]
1028    async fn extract_and_store_skips_self_loop_edges() {
1029        let (gs, _emb) = setup().await;
1030
1031        // LLM returns one entity and one self-loop edge (source == target).
1032        let extraction_json = r#"{
1033            "entities":[{"name":"Rust","type":"language","summary":"systems language"}],
1034            "edges":[{"source":"Rust","target":"Rust","relation":"is","fact":"Rust is Rust","edge_type":"semantic"}]
1035        }"#;
1036        let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1037        let provider = AnyProvider::Mock(mock);
1038
1039        let config = GraphExtractionConfig {
1040            max_entities: 10,
1041            max_edges: 10,
1042            extraction_timeout_secs: 10,
1043            ..Default::default()
1044        };
1045
1046        let result = extract_and_store(
1047            "Rust is a language.".to_owned(),
1048            vec![],
1049            provider,
1050            gs.pool().clone(),
1051            config,
1052            None,
1053            None,
1054        )
1055        .await
1056        .unwrap();
1057
1058        assert_eq!(result.stats.entities_upserted, 1);
1059        assert_eq!(
1060            result.stats.edges_inserted, 0,
1061            "self-loop edge must be rejected (#2215)"
1062        );
1063    }
1064
1065    /// When `apex_mem_enabled = true`, edges must be inserted via `insert_or_supersede`
1066    /// (the APEX-MEM append-only path) instead of the legacy `resolve_edge_typed` path.
1067    /// Verifies that edges are still counted as inserted and that the supersession row
1068    /// is created in the database.
1069    #[tokio::test]
1070    async fn apex_mem_path_inserts_edge_via_insert_or_supersede() {
1071        let (gs, _emb) = setup().await;
1072
1073        let extraction_json = r#"{
1074            "entities":[
1075                {"name":"Alice","type":"person","summary":"a person"},
1076                {"name":"Bob","type":"person","summary":"another person"}
1077            ],
1078            "edges":[
1079                {"source":"Alice","target":"Bob","relation":"KNOWS","fact":"Alice knows Bob","edge_type":"semantic"}
1080            ]
1081        }"#;
1082        let mock = zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
1083        let provider = AnyProvider::Mock(mock);
1084
1085        let config = GraphExtractionConfig {
1086            max_entities: 10,
1087            max_edges: 10,
1088            extraction_timeout_secs: 10,
1089            apex_mem_enabled: true,
1090            ..Default::default()
1091        };
1092
1093        let result = extract_and_store(
1094            "Alice knows Bob.".to_owned(),
1095            vec![],
1096            provider,
1097            gs.pool().clone(),
1098            config,
1099            None,
1100            None,
1101        )
1102        .await
1103        .unwrap();
1104
1105        assert_eq!(result.stats.entities_upserted, 2, "two entities expected");
1106        assert_eq!(
1107            result.stats.edges_inserted, 1,
1108            "APEX-MEM path must insert the edge and count it (#3631)"
1109        );
1110
1111        // Verify the edge row exists and its relation preserves display casing.
1112        let alice_id = gs
1113            .find_entity("alice", crate::graph::EntityType::Person)
1114            .await
1115            .unwrap()
1116            .expect("entity 'alice' must exist")
1117            .id
1118            .0;
1119        let bob_id = gs
1120            .find_entity("bob", crate::graph::EntityType::Person)
1121            .await
1122            .unwrap()
1123            .expect("entity 'bob' must exist")
1124            .id
1125            .0;
1126        let edges = gs.edges_exact(alice_id, bob_id).await.unwrap();
1127        assert_eq!(edges.len(), 1, "exactly one edge expected");
1128        // canonical_relation is lowercased; relation field preserves original casing post-strip
1129        assert_eq!(
1130            edges[0].relation, "KNOWS",
1131            "display relation must preserve original casing"
1132        );
1133    }
1134
1135    /// Regression for #4297: `embed_work_items` must return an empty Vec (fail-open) when the
1136    /// batch `join_all` embed call exceeds the 30 s global timeout.
1137    #[tokio::test]
1138    async fn embed_work_items_timeout_returns_empty() {
1139        use zeph_llm::mock::MockProvider;
1140
1141        // embed_delay_ms > 30_000 ms would make the test too slow; we rely on tokio::time::pause
1142        // to advance virtual time instantly, so the timeout fires without real delay.
1143        tokio::time::pause();
1144
1145        // Delay longer than the 30 s timeout (in virtual time).
1146        let mut mock = MockProvider::default();
1147        mock.supports_embeddings = true;
1148        mock.embed_delay_ms = 31_000;
1149        let provider = AnyProvider::Mock(mock);
1150
1151        let work_items = vec![super::EntityWorkItem {
1152            entity_id: 1,
1153            canonical_name: "Alice".to_owned(),
1154            embed_text: "Alice".to_owned(),
1155            self_point_id: None,
1156        }];
1157
1158        let cfg = NoteLinkingConfig {
1159            timeout_secs: 30,
1160            ..NoteLinkingConfig::default()
1161        };
1162        let result = super::embed_work_items(&work_items, &provider, &cfg).await;
1163        assert!(
1164            result.is_empty(),
1165            "embed_work_items must return empty Vec on 30 s timeout (fail-open)"
1166        );
1167    }
1168}