Skip to main content

zeph_memory/graph/
community.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::HashMap;
5use std::sync::Arc;
6#[allow(unused_imports)]
7use zeph_db::sql;
8
9use futures::TryStreamExt as _;
10use petgraph::Graph;
11use petgraph::graph::NodeIndex;
12use tokio::sync::Semaphore;
13use tokio::task::JoinSet;
14use zeph_llm::LlmProvider as _;
15use zeph_llm::any::AnyProvider;
16use zeph_llm::provider::{Message, Role};
17
18use crate::error::MemoryError;
19
20use super::store::GraphStore;
21use super::types::Entity;
22
23const MAX_LABEL_PROPAGATION_ITERATIONS: usize = 50;
24
25/// Strip control characters, Unicode bidi overrides, and zero-width characters from `s`
26/// to prevent prompt injection via entity names or edge facts sourced from untrusted text.
27///
28/// Filtered categories:
29/// - All Unicode control characters (`Cc` category, covers ASCII controls and more)
30/// - Bidi control characters: U+202A–U+202E, U+2066–U+2069
31/// - Zero-width and invisible characters: U+200B–U+200F (includes U+200C, U+200D)
32/// - Byte-order mark: U+FEFF
33fn scrub_content(s: &str) -> String {
34    s.chars()
35        .filter(|c| {
36            !c.is_control()
37                && !matches!(*c as u32,
38                    0x200B..=0x200F | 0x202A..=0x202E | 0x2066..=0x2069 | 0xFEFF
39                )
40        })
41        .collect()
42}
43
44/// Stats returned from graph eviction.
45#[derive(Debug, Default)]
46pub struct GraphEvictionStats {
47    pub expired_edges_deleted: usize,
48    pub orphan_entities_deleted: usize,
49    pub capped_entities_deleted: usize,
50}
51
52/// Truncate `prompt` to at most `max_bytes` at a UTF-8 boundary, appending `"..."`
53/// if truncation occurred.
54///
55/// If `max_bytes` is 0, returns an empty string immediately (disables community summaries).
56/// Otherwise clamps the boundary to the nearest valid UTF-8 char boundary and appends `"..."`.
57fn truncate_prompt(prompt: String, max_bytes: usize) -> String {
58    if max_bytes == 0 {
59        return String::new();
60    }
61    if prompt.len() <= max_bytes {
62        return prompt;
63    }
64    let boundary = prompt.floor_char_boundary(max_bytes);
65    format!("{}...", &prompt[..boundary])
66}
67
68/// Compute a BLAKE3 fingerprint for a community partition.
69///
70/// The fingerprint is derived from sorted entity IDs and sorted intra-community edge IDs,
71/// ensuring both membership and edge mutations trigger re-summarization.
72/// BLAKE3 is used (not `DefaultHasher`) to guarantee determinism across process restarts.
73fn compute_partition_fingerprint(entity_ids: &[i64], intra_edge_ids: &[i64]) -> String {
74    let mut hasher = blake3::Hasher::new();
75    let mut sorted_entities = entity_ids.to_vec();
76    sorted_entities.sort_unstable();
77    hasher.update(b"entities");
78    for id in &sorted_entities {
79        hasher.update(&id.to_le_bytes());
80    }
81    let mut sorted_edges = intra_edge_ids.to_vec();
82    sorted_edges.sort_unstable();
83    hasher.update(b"edges");
84    for id in &sorted_edges {
85        hasher.update(&id.to_le_bytes());
86    }
87    hasher.finalize().to_hex().to_string()
88}
89
90/// Per-community data collected before spawning LLM summarization tasks.
91struct CommunityData {
92    entity_ids: Vec<i64>,
93    entity_names: Vec<String>,
94    intra_facts: Vec<String>,
95    fingerprint: String,
96    name: String,
97}
98
99type UndirectedGraph = Graph<i64, (), petgraph::Undirected>;
100
101async fn build_entity_graph_and_maps(
102    store: &GraphStore,
103    entities: &[Entity],
104    edge_chunk_size: usize,
105) -> Result<
106    (
107        UndirectedGraph,
108        HashMap<(i64, i64), Vec<String>>,
109        HashMap<(i64, i64), Vec<i64>>,
110    ),
111    MemoryError,
112> {
113    let mut graph = UndirectedGraph::new_undirected();
114    let mut node_map: HashMap<i64, NodeIndex> = HashMap::new();
115
116    for entity in entities {
117        let idx = graph.add_node(entity.id.0);
118        node_map.insert(entity.id.0, idx);
119    }
120
121    let mut edge_facts_map: HashMap<(i64, i64), Vec<String>> = HashMap::new();
122    let mut edge_id_map: HashMap<(i64, i64), Vec<i64>> = HashMap::new();
123
124    if edge_chunk_size == 0 {
125        let edges: Vec<_> = store.all_active_edges_stream().try_collect().await?;
126        for edge in &edges {
127            if let (Some(&src_idx), Some(&tgt_idx)) = (
128                node_map.get(&edge.source_entity_id),
129                node_map.get(&edge.target_entity_id),
130            ) {
131                graph.add_edge(src_idx, tgt_idx, ());
132            }
133            let key = (edge.source_entity_id, edge.target_entity_id);
134            edge_facts_map
135                .entry(key)
136                .or_default()
137                .push(edge.fact.clone());
138            edge_id_map.entry(key).or_default().push(edge.id);
139        }
140    } else {
141        let limit = i64::try_from(edge_chunk_size).unwrap_or(i64::MAX);
142        let mut last_id: i64 = 0;
143        loop {
144            let chunk = store.edges_after_id(last_id, limit).await?;
145            if chunk.is_empty() {
146                break;
147            }
148            last_id = chunk.last().expect("non-empty chunk has a last element").id;
149            for edge in &chunk {
150                if let (Some(&src_idx), Some(&tgt_idx)) = (
151                    node_map.get(&edge.source_entity_id),
152                    node_map.get(&edge.target_entity_id),
153                ) {
154                    graph.add_edge(src_idx, tgt_idx, ());
155                }
156                let key = (edge.source_entity_id, edge.target_entity_id);
157                edge_facts_map
158                    .entry(key)
159                    .or_default()
160                    .push(edge.fact.clone());
161                edge_id_map.entry(key).or_default().push(edge.id);
162            }
163        }
164    }
165
166    Ok((graph, edge_facts_map, edge_id_map))
167}
168
169fn run_label_propagation(graph: &UndirectedGraph) -> HashMap<usize, Vec<i64>> {
170    let mut labels: Vec<usize> = (0..graph.node_count()).collect();
171
172    for _ in 0..MAX_LABEL_PROPAGATION_ITERATIONS {
173        let mut changed = false;
174        for node_idx in graph.node_indices() {
175            let neighbors: Vec<NodeIndex> = graph.neighbors(node_idx).collect();
176            if neighbors.is_empty() {
177                continue;
178            }
179            let mut freq: HashMap<usize, usize> = HashMap::new();
180            for &nbr in &neighbors {
181                *freq.entry(labels[nbr.index()]).or_insert(0) += 1;
182            }
183            let max_count = *freq.values().max().unwrap_or(&0);
184            let best_label = freq
185                .iter()
186                .filter(|&(_, count)| *count == max_count)
187                .map(|(&label, _)| label)
188                .min()
189                .unwrap_or(labels[node_idx.index()]);
190            if labels[node_idx.index()] != best_label {
191                labels[node_idx.index()] = best_label;
192                changed = true;
193            }
194        }
195        if !changed {
196            break;
197        }
198    }
199
200    let mut communities: HashMap<usize, Vec<i64>> = HashMap::new();
201    for node_idx in graph.node_indices() {
202        let entity_id = graph[node_idx];
203        communities
204            .entry(labels[node_idx.index()])
205            .or_default()
206            .push(entity_id);
207    }
208    communities.retain(|_, members| members.len() >= 2);
209    communities
210}
211
212struct ClassifyResult {
213    to_summarize: Vec<CommunityData>,
214    unchanged_count: usize,
215    new_fingerprints: std::collections::HashSet<String>,
216}
217
218fn classify_communities(
219    communities: &HashMap<usize, Vec<i64>>,
220    edge_facts_map: &HashMap<(i64, i64), Vec<String>>,
221    edge_id_map: &HashMap<(i64, i64), Vec<i64>>,
222    entity_name_map: &HashMap<i64, &str>,
223    stored_fingerprints: &HashMap<String, i64>,
224    sorted_labels: &[usize],
225) -> ClassifyResult {
226    let mut to_summarize: Vec<CommunityData> = Vec::new();
227    let mut unchanged_count = 0usize;
228    let mut new_fingerprints: std::collections::HashSet<String> = std::collections::HashSet::new();
229
230    for (label_index, &label) in sorted_labels.iter().enumerate() {
231        let entity_ids = communities[&label].as_slice();
232        let member_set: std::collections::HashSet<i64> = entity_ids.iter().copied().collect();
233
234        let mut intra_facts: Vec<String> = Vec::new();
235        let mut intra_edge_ids: Vec<i64> = Vec::new();
236        for (&(src, tgt), facts) in edge_facts_map {
237            if member_set.contains(&src) && member_set.contains(&tgt) {
238                intra_facts.extend(facts.iter().map(|f| scrub_content(f)));
239                if let Some(ids) = edge_id_map.get(&(src, tgt)) {
240                    intra_edge_ids.extend_from_slice(ids);
241                }
242            }
243        }
244
245        let fingerprint = compute_partition_fingerprint(entity_ids, &intra_edge_ids);
246        new_fingerprints.insert(fingerprint.clone());
247
248        if stored_fingerprints.contains_key(&fingerprint) {
249            unchanged_count += 1;
250            continue;
251        }
252
253        let entity_names: Vec<String> = entity_ids
254            .iter()
255            .filter_map(|id| entity_name_map.get(id).map(|&s| scrub_content(s)))
256            .collect();
257
258        // Append label_index to prevent ON CONFLICT(name) collisions when two communities
259        // share the same top-3 entity names across detect_communities runs (IC-SIG-02).
260        let base_name = entity_names
261            .iter()
262            .take(3)
263            .cloned()
264            .collect::<Vec<_>>()
265            .join(", ");
266        let name = format!("{base_name} [{label_index}]");
267
268        to_summarize.push(CommunityData {
269            entity_ids: entity_ids.to_vec(),
270            entity_names,
271            intra_facts,
272            fingerprint,
273            name,
274        });
275    }
276
277    ClassifyResult {
278        to_summarize,
279        unchanged_count,
280        new_fingerprints,
281    }
282}
283
284async fn summarize_and_upsert_communities(
285    store: &GraphStore,
286    provider: &AnyProvider,
287    to_summarize: Vec<CommunityData>,
288    concurrency: usize,
289    community_summary_max_prompt_bytes: usize,
290) -> Result<usize, MemoryError> {
291    let semaphore = Arc::new(Semaphore::new(concurrency.max(1)));
292    let mut join_set: JoinSet<(String, String, Vec<i64>, String)> = JoinSet::new();
293
294    for data in to_summarize {
295        let provider = provider.clone();
296        let sem = Arc::clone(&semaphore);
297        let max_bytes = community_summary_max_prompt_bytes;
298        join_set.spawn(async move {
299            let _permit = sem.acquire().await.expect("semaphore is never closed");
300            let summary = match generate_community_summary(
301                &provider,
302                &data.entity_names,
303                &data.intra_facts,
304                max_bytes,
305            )
306            .await
307            {
308                Ok(text) => text,
309                Err(e) => {
310                    tracing::warn!(community = %data.name, "community summary generation failed: {e:#}");
311                    String::new()
312                }
313            };
314            (data.name, summary, data.entity_ids, data.fingerprint)
315        });
316    }
317
318    // Collect results — handle task panics explicitly (HIGH-01 fix).
319    let mut results: Vec<(String, String, Vec<i64>, String)> = Vec::new();
320    while let Some(outcome) = join_set.join_next().await {
321        match outcome {
322            Ok(tuple) => results.push(tuple),
323            Err(e) => {
324                tracing::error!(
325                    panicked = e.is_panic(),
326                    cancelled = e.is_cancelled(),
327                    "community summary task failed"
328                );
329            }
330        }
331    }
332
333    results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
334
335    let mut count = 0usize;
336    for (name, summary, entity_ids, fingerprint) in results {
337        store
338            .upsert_community(&name, &summary, &entity_ids, Some(&fingerprint))
339            .await?;
340        count += 1;
341    }
342
343    Ok(count)
344}
345
346/// Run label propagation on the full entity graph, generate community summaries via LLM,
347/// and upsert results to `SQLite`.
348///
349/// Returns the number of communities detected (with `>= 2` entities).
350///
351/// Unchanged communities (same entity membership and intra-community edges) are skipped —
352/// their existing summaries are preserved without LLM calls (incremental detection, #1262).
353/// LLM calls for changed communities are parallelized via a `JoinSet` bounded by a
354/// semaphore with `concurrency` permits (#1260).
355///
356/// # Panics
357///
358/// Does not panic in normal operation. The `semaphore.acquire().await.expect(...)` call is
359/// infallible because the semaphore is never closed during the lifetime of this function.
360///
361/// # Errors
362///
363/// Returns an error if `SQLite` queries or LLM calls fail.
364pub async fn detect_communities(
365    store: &GraphStore,
366    provider: &AnyProvider,
367    community_summary_max_prompt_bytes: usize,
368    concurrency: usize,
369    edge_chunk_size: usize,
370) -> Result<usize, MemoryError> {
371    let edge_chunk_size = if edge_chunk_size == 0 {
372        tracing::warn!(
373            "edge_chunk_size is 0, which would load all edges into memory; \
374             using safe default of 10_000"
375        );
376        10_000_usize
377    } else {
378        edge_chunk_size
379    };
380
381    let entities = store.all_entities().await?;
382    if entities.len() < 2 {
383        return Ok(0);
384    }
385
386    let (graph, edge_facts_map, edge_id_map) =
387        build_entity_graph_and_maps(store, &entities, edge_chunk_size).await?;
388
389    let communities = run_label_propagation(&graph);
390
391    let entity_name_map: HashMap<i64, &str> =
392        entities.iter().map(|e| (e.id.0, e.name.as_str())).collect();
393    let stored_fingerprints = store.community_fingerprints().await?;
394
395    let mut sorted_labels: Vec<usize> = communities.keys().copied().collect();
396    sorted_labels.sort_unstable();
397
398    let ClassifyResult {
399        to_summarize,
400        unchanged_count,
401        new_fingerprints,
402    } = classify_communities(
403        &communities,
404        &edge_facts_map,
405        &edge_id_map,
406        &entity_name_map,
407        &stored_fingerprints,
408        &sorted_labels,
409    );
410
411    tracing::debug!(
412        total = sorted_labels.len(),
413        unchanged = unchanged_count,
414        to_summarize = to_summarize.len(),
415        "community detection: partition classification complete"
416    );
417
418    // Delete dissolved communities (fingerprints no longer in new partition set).
419    for (stored_fp, community_id) in &stored_fingerprints {
420        if !new_fingerprints.contains(stored_fp.as_str()) {
421            store.delete_community_by_id(*community_id).await?;
422        }
423    }
424
425    let new_count = summarize_and_upsert_communities(
426        store,
427        provider,
428        to_summarize,
429        concurrency,
430        community_summary_max_prompt_bytes,
431    )
432    .await?;
433
434    Ok(unchanged_count + new_count)
435}
436
437/// Assign a single entity to an existing community via neighbor majority vote.
438///
439/// Returns `Some(community_id)` if assigned, `None` if no neighbors have communities.
440///
441/// When an entity is added, the stored fingerprint is cleared (`NULL`) so the next
442/// `detect_communities` run will re-summarize the affected community (CRIT-02 fix).
443///
444/// # Errors
445///
446/// Returns an error if `SQLite` queries fail.
447pub async fn assign_to_community(
448    store: &GraphStore,
449    entity_id: i64,
450) -> Result<Option<i64>, MemoryError> {
451    let edges = store.edges_for_entity(entity_id).await?;
452    if edges.is_empty() {
453        return Ok(None);
454    }
455
456    let neighbor_ids: Vec<i64> = edges
457        .iter()
458        .map(|e| {
459            if e.source_entity_id == entity_id {
460                e.target_entity_id
461            } else {
462                e.source_entity_id
463            }
464        })
465        .collect();
466
467    let mut community_votes: HashMap<i64, usize> = HashMap::new();
468    for &nbr_id in &neighbor_ids {
469        if let Some(community) = store.community_for_entity(nbr_id).await? {
470            *community_votes.entry(community.id).or_insert(0) += 1;
471        }
472    }
473
474    if community_votes.is_empty() {
475        return Ok(None);
476    }
477
478    // Majority vote — tie-break by smallest community_id.
479    // community_votes is non-empty (checked above), so max_by always returns Some.
480    let Some((&best_community_id, _)) =
481        community_votes
482            .iter()
483            .max_by(|&(&id_a, &count_a), &(&id_b, &count_b)| {
484                count_a.cmp(&count_b).then(id_b.cmp(&id_a))
485            })
486    else {
487        return Ok(None);
488    };
489
490    if let Some(mut target) = store.find_community_by_id(best_community_id).await? {
491        if !target.entity_ids.iter().any(|eid| eid.0 == entity_id) {
492            target.entity_ids.push(crate::types::EntityId(entity_id));
493            let raw_ids: Vec<i64> = target.entity_ids.iter().map(|eid| eid.0).collect();
494            store
495                .upsert_community(&target.name, &target.summary, &raw_ids, None)
496                .await?;
497            // Clear fingerprint to invalidate cache — next detect_communities will re-summarize.
498            store.clear_community_fingerprint(best_community_id).await?;
499        }
500        return Ok(Some(best_community_id));
501    }
502
503    Ok(None)
504}
505
506/// Remove `Qdrant` points for entities that no longer exist in `SQLite`.
507///
508/// Returns the number of stale points deleted.
509///
510/// # Errors
511///
512/// Returns an error if `Qdrant` operations fail.
513pub async fn cleanup_stale_entity_embeddings(
514    store: &GraphStore,
515    embeddings: &crate::embedding_store::EmbeddingStore,
516) -> Result<usize, MemoryError> {
517    const ENTITY_COLLECTION: &str = "zeph_graph_entities";
518
519    // Enumerate all (point_id, entity_id) pairs in the Qdrant entity collection.
520    // Points without `entity_id_str` (legacy writes) are silently skipped; they will
521    // gain the field on the next merge_entity / store_entity_embedding call.
522    let pairs = embeddings.scroll_all_entity_ids(ENTITY_COLLECTION).await?;
523    if pairs.is_empty() {
524        return Ok(0);
525    }
526
527    let qdrant_ids: Vec<i64> = pairs.iter().map(|(_, eid)| *eid).collect();
528    let live: std::collections::HashSet<i64> = store
529        .entity_ids_in(&qdrant_ids)
530        .await?
531        .into_iter()
532        .collect();
533
534    let stale_point_ids: Vec<String> = pairs
535        .into_iter()
536        .filter_map(|(pid, eid)| (!live.contains(&eid)).then_some(pid))
537        .collect();
538
539    if stale_point_ids.is_empty() {
540        return Ok(0);
541    }
542
543    let count = stale_point_ids.len();
544    embeddings
545        .delete_from_collection(ENTITY_COLLECTION, stale_point_ids)
546        .await?;
547    Ok(count)
548}
549
550/// Run graph eviction: clean expired edges, orphan entities, and cap entity count.
551///
552/// # Errors
553///
554/// Returns an error if `SQLite` queries fail.
555pub async fn run_graph_eviction(
556    store: &GraphStore,
557    expired_edge_retention_days: u32,
558    max_entities: usize,
559) -> Result<GraphEvictionStats, MemoryError> {
560    let expired_edges_deleted = store
561        .delete_expired_edges(expired_edge_retention_days)
562        .await?;
563    let orphan_entities_deleted = store
564        .delete_orphan_entities(expired_edge_retention_days)
565        .await?;
566    let capped_entities_deleted = if max_entities > 0 {
567        store.cap_entities(max_entities).await?
568    } else {
569        0
570    };
571
572    Ok(GraphEvictionStats {
573        expired_edges_deleted,
574        orphan_entities_deleted,
575        capped_entities_deleted,
576    })
577}
578
579async fn generate_community_summary(
580    provider: &AnyProvider,
581    entity_names: &[String],
582    edge_facts: &[String],
583    max_prompt_bytes: usize,
584) -> Result<String, MemoryError> {
585    let entities_str = entity_names.join(", ");
586    // Cap facts at 20 to bound prompt size; data is already scrubbed upstream.
587    let facts_str = edge_facts
588        .iter()
589        .take(20)
590        .map(|f| format!("- {f}"))
591        .collect::<Vec<_>>()
592        .join("\n");
593
594    let raw_prompt = format!(
595        "Summarize the following group of related entities and their relationships \
596         into a single paragraph (2-3 sentences). Focus on the theme that connects \
597         them and the key relationships.\n\nEntities: {entities_str}\n\
598         Relationships:\n{facts_str}\n\nSummary:"
599    );
600
601    let original_bytes = raw_prompt.len();
602    let truncated = raw_prompt.len() > max_prompt_bytes;
603    let prompt = truncate_prompt(raw_prompt, max_prompt_bytes);
604    if prompt.is_empty() {
605        return Ok(String::new());
606    }
607    if truncated {
608        tracing::warn!(
609            entity_count = entity_names.len(),
610            original_bytes,
611            truncated_bytes = prompt.len(),
612            "community summary prompt truncated"
613        );
614    }
615
616    let messages = [Message::from_legacy(Role::User, prompt)];
617    let response: String = provider.chat(&messages).await.map_err(MemoryError::Llm)?;
618    Ok(response)
619}
620
621#[cfg(test)]
622mod tests {
623    use std::sync::{Arc, Mutex};
624
625    use super::*;
626    use crate::graph::types::EntityType;
627    use crate::store::SqliteStore;
628
629    async fn setup() -> GraphStore {
630        let store = SqliteStore::new(":memory:").await.unwrap();
631        GraphStore::new(store.pool().clone())
632    }
633
634    fn mock_provider() -> AnyProvider {
635        AnyProvider::Mock(zeph_llm::mock::MockProvider::default())
636    }
637
638    fn recording_provider() -> (
639        AnyProvider,
640        Arc<Mutex<Vec<Vec<zeph_llm::provider::Message>>>>,
641    ) {
642        let (mock, buf) = zeph_llm::mock::MockProvider::default().with_recording();
643        (AnyProvider::Mock(mock), buf)
644    }
645
646    #[tokio::test]
647    async fn test_detect_communities_empty_graph() {
648        let store = setup().await;
649        let provider = mock_provider();
650        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
651            .await
652            .unwrap();
653        assert_eq!(count, 0);
654    }
655
656    #[tokio::test]
657    async fn test_detect_communities_single_entity() {
658        let store = setup().await;
659        let provider = mock_provider();
660        store
661            .upsert_entity("Solo", "Solo", EntityType::Concept, None)
662            .await
663            .unwrap();
664        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
665            .await
666            .unwrap();
667        assert_eq!(count, 0, "single isolated entity must not form a community");
668    }
669
670    #[tokio::test]
671    async fn test_single_entity_community_filtered() {
672        let store = setup().await;
673        let provider = mock_provider();
674
675        // Create 3 connected entities (cluster A) and 1 isolated entity.
676        let a = store
677            .upsert_entity("A", "A", EntityType::Concept, None)
678            .await
679            .unwrap()
680            .0;
681        let b = store
682            .upsert_entity("B", "B", EntityType::Concept, None)
683            .await
684            .unwrap()
685            .0;
686        let c = store
687            .upsert_entity("C", "C", EntityType::Concept, None)
688            .await
689            .unwrap()
690            .0;
691        let iso = store
692            .upsert_entity("Isolated", "Isolated", EntityType::Concept, None)
693            .await
694            .unwrap()
695            .0;
696
697        store
698            .insert_edge(a, b, "r", "A relates B", 1.0, None)
699            .await
700            .unwrap();
701        store
702            .insert_edge(b, c, "r", "B relates C", 1.0, None)
703            .await
704            .unwrap();
705
706        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
707            .await
708            .unwrap();
709        // Isolated entity has no edges — must NOT be persisted as a community.
710        assert_eq!(count, 1, "only the 3-entity cluster should be detected");
711
712        let communities = store.all_communities().await.unwrap();
713        assert_eq!(communities.len(), 1);
714        assert!(
715            !communities[0].entity_ids.iter().any(|eid| eid.0 == iso),
716            "isolated entity must not be in any community"
717        );
718    }
719
720    #[tokio::test]
721    async fn test_label_propagation_basic() {
722        let store = setup().await;
723        let provider = mock_provider();
724
725        // Create 4 clusters of 3 entities each (12 entities total), fully isolated.
726        let mut cluster_ids: Vec<Vec<i64>> = Vec::new();
727        for cluster in 0..4_i64 {
728            let mut ids = Vec::new();
729            for node in 0..3_i64 {
730                let name = format!("c{cluster}_n{node}");
731                let id = store
732                    .upsert_entity(&name, &name, EntityType::Concept, None)
733                    .await
734                    .unwrap()
735                    .0;
736                ids.push(id);
737            }
738            // Connect nodes within cluster (chain: 0-1-2).
739            store
740                .insert_edge(ids[0], ids[1], "r", "f", 1.0, None)
741                .await
742                .unwrap();
743            store
744                .insert_edge(ids[1], ids[2], "r", "f", 1.0, None)
745                .await
746                .unwrap();
747            cluster_ids.push(ids);
748        }
749
750        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
751            .await
752            .unwrap();
753        assert_eq!(count, 4, "expected 4 communities, one per cluster");
754
755        let communities = store.all_communities().await.unwrap();
756        assert_eq!(communities.len(), 4);
757
758        // Each cluster's entity IDs must appear in exactly one community.
759        for ids in &cluster_ids {
760            let found = communities
761                .iter()
762                .filter(|c| {
763                    ids.iter()
764                        .any(|id| c.entity_ids.iter().any(|eid| eid.0 == *id))
765                })
766                .count();
767            assert_eq!(
768                found, 1,
769                "all nodes of a cluster must be in the same community"
770            );
771        }
772    }
773
774    #[tokio::test]
775    async fn test_all_isolated_nodes() {
776        let store = setup().await;
777        let provider = mock_provider();
778
779        // Insert 5 entities with no edges at all.
780        for i in 0..5_i64 {
781            store
782                .upsert_entity(
783                    &format!("iso_{i}"),
784                    &format!("iso_{i}"),
785                    EntityType::Concept,
786                    None,
787                )
788                .await
789                .unwrap();
790        }
791
792        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
793            .await
794            .unwrap();
795        assert_eq!(count, 0, "zero-edge graph must produce no communities");
796        assert_eq!(store.community_count().await.unwrap(), 0);
797    }
798
799    #[tokio::test]
800    async fn test_eviction_expired_edges() {
801        let store = setup().await;
802
803        let a = store
804            .upsert_entity("EA", "EA", EntityType::Concept, None)
805            .await
806            .unwrap()
807            .0;
808        let b = store
809            .upsert_entity("EB", "EB", EntityType::Concept, None)
810            .await
811            .unwrap()
812            .0;
813        let edge_id = store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
814        store.invalidate_edge(edge_id).await.unwrap();
815
816        // Manually set expired_at to a date far in the past to trigger deletion.
817        zeph_db::query(sql!(
818            "UPDATE graph_edges SET expired_at = datetime('now', '-200 days') WHERE id = ?1"
819        ))
820        .bind(edge_id)
821        .execute(store.pool())
822        .await
823        .unwrap();
824
825        let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
826        assert_eq!(stats.expired_edges_deleted, 1);
827    }
828
829    #[tokio::test]
830    async fn test_eviction_orphan_entities() {
831        let store = setup().await;
832
833        let iso = store
834            .upsert_entity("Orphan", "Orphan", EntityType::Concept, None)
835            .await
836            .unwrap()
837            .0;
838
839        // Set last_seen_at to far in the past.
840        zeph_db::query(sql!(
841            "UPDATE graph_entities SET last_seen_at = datetime('now', '-200 days') WHERE id = ?1"
842        ))
843        .bind(iso)
844        .execute(store.pool())
845        .await
846        .unwrap();
847
848        let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
849        assert_eq!(stats.orphan_entities_deleted, 1);
850    }
851
852    #[tokio::test]
853    async fn test_eviction_entity_cap() {
854        let store = setup().await;
855
856        // Insert 5 entities with no edges (so they can be capped).
857        for i in 0..5_i64 {
858            let name = format!("cap_entity_{i}");
859            store
860                .upsert_entity(&name, &name, EntityType::Concept, None)
861                .await
862                .unwrap();
863        }
864
865        let stats = run_graph_eviction(&store, 90, 3).await.unwrap();
866        assert_eq!(
867            stats.capped_entities_deleted, 2,
868            "should delete 5-3=2 entities"
869        );
870        assert_eq!(store.entity_count().await.unwrap(), 3);
871    }
872
873    #[tokio::test]
874    async fn test_assign_to_community_no_neighbors() {
875        let store = setup().await;
876        let entity_id = store
877            .upsert_entity("Loner", "Loner", EntityType::Concept, None)
878            .await
879            .unwrap()
880            .0;
881
882        let result = assign_to_community(&store, entity_id).await.unwrap();
883        assert!(result.is_none());
884    }
885
886    #[tokio::test]
887    async fn test_extraction_count_persistence() {
888        use tempfile::NamedTempFile;
889        // Create a real on-disk SQLite DB to verify persistence across store instances.
890        let tmp = NamedTempFile::new().unwrap();
891        let path = tmp.path().to_str().unwrap().to_owned();
892
893        let store1 = {
894            let s = crate::store::SqliteStore::new(&path).await.unwrap();
895            GraphStore::new(s.pool().clone())
896        };
897
898        store1.set_metadata("extraction_count", "0").await.unwrap();
899        for i in 1..=5_i64 {
900            store1
901                .set_metadata("extraction_count", &i.to_string())
902                .await
903                .unwrap();
904        }
905
906        // Open a second handle to the same file and verify the value persists.
907        let store2 = {
908            let s = crate::store::SqliteStore::new(&path).await.unwrap();
909            GraphStore::new(s.pool().clone())
910        };
911        assert_eq!(store2.extraction_count().await.unwrap(), 5);
912    }
913
914    #[test]
915    fn test_scrub_content_ascii_control() {
916        // Newline, carriage return, null byte, tab (all ASCII control chars) must be stripped.
917        let input = "hello\nworld\r\x00\x01\x09end";
918        assert_eq!(scrub_content(input), "helloworldend");
919    }
920
921    #[test]
922    fn test_scrub_content_bidi_overrides() {
923        // U+202A LEFT-TO-RIGHT EMBEDDING, U+202E RIGHT-TO-LEFT OVERRIDE,
924        // U+2066 LEFT-TO-RIGHT ISOLATE, U+2069 POP DIRECTIONAL ISOLATE.
925        let input = "safe\u{202A}inject\u{202E}end\u{2066}iso\u{2069}done".to_string();
926        assert_eq!(scrub_content(&input), "safeinjectendisodone");
927    }
928
929    #[test]
930    fn test_scrub_content_zero_width() {
931        // U+200B ZERO WIDTH SPACE, U+200C ZERO WIDTH NON-JOINER, U+200D ZERO WIDTH JOINER,
932        // U+200F RIGHT-TO-LEFT MARK.
933        let input = "a\u{200B}b\u{200C}c\u{200D}d\u{200F}e".to_string();
934        assert_eq!(scrub_content(&input), "abcde");
935    }
936
937    #[test]
938    fn test_scrub_content_bom() {
939        // U+FEFF BYTE ORDER MARK must be stripped.
940        let input = "\u{FEFF}hello".to_string();
941        assert_eq!(scrub_content(&input), "hello");
942    }
943
944    #[test]
945    fn test_scrub_content_clean_string_unchanged() {
946        let input = "Hello, World! 123 — normal text.";
947        assert_eq!(scrub_content(input), input);
948    }
949
950    #[test]
951    fn test_truncate_prompt_within_limit() {
952        let result = truncate_prompt("short".into(), 100);
953        assert_eq!(result, "short");
954    }
955
956    #[test]
957    fn test_truncate_prompt_zero_max_bytes() {
958        let result = truncate_prompt("hello".into(), 0);
959        assert_eq!(result, "");
960    }
961
962    #[test]
963    fn test_truncate_prompt_long_facts() {
964        let facts: Vec<String> = (0..20)
965            .map(|i| format!("fact_{i}_{}", "x".repeat(20)))
966            .collect();
967        let prompt = facts.join("\n");
968        let result = truncate_prompt(prompt, 200);
969        assert!(
970            result.ends_with("..."),
971            "truncated prompt must end with '...'"
972        );
973        // byte length must be at most max_bytes + 3 (the "..." suffix)
974        assert!(result.len() <= 203);
975        assert!(std::str::from_utf8(result.as_bytes()).is_ok());
976    }
977
978    #[test]
979    fn test_truncate_prompt_utf8_boundary() {
980        // Each '🔥' is 4 bytes; 100 emojis = 400 bytes.
981        let prompt = "🔥".repeat(100);
982        let result = truncate_prompt(prompt, 10);
983        assert!(
984            result.ends_with("..."),
985            "truncated prompt must end with '...'"
986        );
987        // floor_char_boundary(10) for 4-byte chars lands at 8 (2 full emojis = 8 bytes)
988        assert_eq!(result.len(), 8 + 3, "2 emojis (8 bytes) + '...' (3 bytes)");
989        assert!(std::str::from_utf8(result.as_bytes()).is_ok());
990    }
991
992    #[tokio::test]
993    async fn test_assign_to_community_majority_vote() {
994        let store = setup().await;
995
996        // Setup: community C1 with members [A, B], then add D with edges to both A and B.
997        let a = store
998            .upsert_entity("AA", "AA", EntityType::Concept, None)
999            .await
1000            .unwrap()
1001            .0;
1002        let b = store
1003            .upsert_entity("BB", "BB", EntityType::Concept, None)
1004            .await
1005            .unwrap()
1006            .0;
1007        let d = store
1008            .upsert_entity("DD", "DD", EntityType::Concept, None)
1009            .await
1010            .unwrap()
1011            .0;
1012
1013        store
1014            .upsert_community("test_community", "summary", &[a, b], None)
1015            .await
1016            .unwrap();
1017
1018        store.insert_edge(d, a, "r", "f", 1.0, None).await.unwrap();
1019        store.insert_edge(d, b, "r", "f", 1.0, None).await.unwrap();
1020
1021        let result = assign_to_community(&store, d).await.unwrap();
1022        assert!(result.is_some());
1023
1024        // The returned ID must be valid for subsequent lookups (HIGH-IC-01 regression test).
1025        let returned_id = result.unwrap();
1026        let community = store
1027            .find_community_by_id(returned_id)
1028            .await
1029            .unwrap()
1030            .expect("returned community_id must reference an existing row");
1031        assert!(
1032            community.entity_ids.iter().any(|eid| eid.0 == d),
1033            "D should be added to the community"
1034        );
1035        // Fingerprint must be NULL after assign (cache invalidated for next detect run).
1036        assert!(
1037            community.fingerprint.is_none(),
1038            "fingerprint must be cleared after assign_to_community"
1039        );
1040    }
1041
1042    /// #1262: Second `detect_communities` call with no graph changes must produce 0 LLM calls.
1043    #[tokio::test]
1044    async fn test_incremental_detection_no_changes_skips_llm() {
1045        let store = setup().await;
1046        let (provider, call_buf) = recording_provider();
1047
1048        let a = store
1049            .upsert_entity("X", "X", EntityType::Concept, None)
1050            .await
1051            .unwrap()
1052            .0;
1053        let b = store
1054            .upsert_entity("Y", "Y", EntityType::Concept, None)
1055            .await
1056            .unwrap()
1057            .0;
1058        store
1059            .insert_edge(a, b, "r", "X relates Y", 1.0, None)
1060            .await
1061            .unwrap();
1062
1063        // First run: LLM called once to summarize the community.
1064        detect_communities(&store, &provider, usize::MAX, 4, 0)
1065            .await
1066            .unwrap();
1067        let first_calls = call_buf.lock().unwrap().len();
1068        assert_eq!(first_calls, 1, "first run must produce exactly 1 LLM call");
1069
1070        // Second run: graph unchanged — 0 LLM calls.
1071        detect_communities(&store, &provider, usize::MAX, 4, 0)
1072            .await
1073            .unwrap();
1074        let second_calls = call_buf.lock().unwrap().len();
1075        assert_eq!(
1076            second_calls, first_calls,
1077            "second run with no graph changes must produce 0 additional LLM calls"
1078        );
1079    }
1080
1081    /// #1262: Adding an edge changes the fingerprint — LLM must be called again.
1082    #[tokio::test]
1083    async fn test_incremental_detection_edge_change_triggers_resummary() {
1084        let store = setup().await;
1085        let (provider, call_buf) = recording_provider();
1086
1087        let a = store
1088            .upsert_entity("P", "P", EntityType::Concept, None)
1089            .await
1090            .unwrap()
1091            .0;
1092        let b = store
1093            .upsert_entity("Q", "Q", EntityType::Concept, None)
1094            .await
1095            .unwrap()
1096            .0;
1097        store
1098            .insert_edge(a, b, "r", "P relates Q", 1.0, None)
1099            .await
1100            .unwrap();
1101
1102        detect_communities(&store, &provider, usize::MAX, 4, 0)
1103            .await
1104            .unwrap();
1105        let after_first = call_buf.lock().unwrap().len();
1106        assert_eq!(after_first, 1);
1107
1108        // Add a new edge within the community to change its fingerprint.
1109        store
1110            .insert_edge(b, a, "r2", "Q also relates P", 1.0, None)
1111            .await
1112            .unwrap();
1113
1114        detect_communities(&store, &provider, usize::MAX, 4, 0)
1115            .await
1116            .unwrap();
1117        let after_second = call_buf.lock().unwrap().len();
1118        assert_eq!(
1119            after_second, 2,
1120            "edge change must trigger one additional LLM call"
1121        );
1122    }
1123
1124    /// #1262: Communities whose fingerprints vanish are deleted on refresh.
1125    #[tokio::test]
1126    async fn test_incremental_detection_dissolved_community_deleted() {
1127        let store = setup().await;
1128        let provider = mock_provider();
1129
1130        let a = store
1131            .upsert_entity("M1", "M1", EntityType::Concept, None)
1132            .await
1133            .unwrap()
1134            .0;
1135        let b = store
1136            .upsert_entity("M2", "M2", EntityType::Concept, None)
1137            .await
1138            .unwrap()
1139            .0;
1140        let edge_id = store
1141            .insert_edge(a, b, "r", "M1 relates M2", 1.0, None)
1142            .await
1143            .unwrap();
1144
1145        detect_communities(&store, &provider, usize::MAX, 4, 0)
1146            .await
1147            .unwrap();
1148        assert_eq!(store.community_count().await.unwrap(), 1);
1149
1150        // Invalidate the edge — community dissolves.
1151        store.invalidate_edge(edge_id).await.unwrap();
1152
1153        detect_communities(&store, &provider, usize::MAX, 4, 0)
1154            .await
1155            .unwrap();
1156        assert_eq!(
1157            store.community_count().await.unwrap(),
1158            0,
1159            "dissolved community must be deleted on next refresh"
1160        );
1161    }
1162
1163    /// #1260: Sequential fallback (concurrency=1) produces correct results.
1164    #[tokio::test]
1165    async fn test_detect_communities_concurrency_one() {
1166        let store = setup().await;
1167        let provider = mock_provider();
1168
1169        let a = store
1170            .upsert_entity("C1A", "C1A", EntityType::Concept, None)
1171            .await
1172            .unwrap()
1173            .0;
1174        let b = store
1175            .upsert_entity("C1B", "C1B", EntityType::Concept, None)
1176            .await
1177            .unwrap()
1178            .0;
1179        store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
1180
1181        let count = detect_communities(&store, &provider, usize::MAX, 1, 0)
1182            .await
1183            .unwrap();
1184        assert_eq!(count, 1, "concurrency=1 must still detect the community");
1185        assert_eq!(store.community_count().await.unwrap(), 1);
1186    }
1187
1188    #[test]
1189    fn test_compute_fingerprint_deterministic() {
1190        let fp1 = compute_partition_fingerprint(&[1, 2, 3], &[10, 20]);
1191        let fp2 = compute_partition_fingerprint(&[3, 1, 2], &[20, 10]);
1192        assert_eq!(fp1, fp2, "fingerprint must be order-independent");
1193
1194        let fp3 = compute_partition_fingerprint(&[1, 2, 3], &[10, 30]);
1195        assert_ne!(
1196            fp1, fp3,
1197            "different edge IDs must produce different fingerprint"
1198        );
1199
1200        let fp4 = compute_partition_fingerprint(&[1, 2, 4], &[10, 20]);
1201        assert_ne!(
1202            fp1, fp4,
1203            "different entity IDs must produce different fingerprint"
1204        );
1205    }
1206
1207    /// Domain separator test: entity/edge sequences with same raw bytes must not collide.
1208    ///
1209    /// Without domain separators, entities=[1,2] edges=[3] would hash identically to
1210    /// entities=[1] edges=[2,3] (same concatenated `le_bytes`). With separators they differ.
1211    #[test]
1212    fn test_compute_fingerprint_domain_separation() {
1213        let fp_a = compute_partition_fingerprint(&[1, 2], &[3]);
1214        let fp_b = compute_partition_fingerprint(&[1], &[2, 3]);
1215        assert_ne!(
1216            fp_a, fp_b,
1217            "entity/edge sequences with same raw bytes must produce different fingerprints"
1218        );
1219    }
1220
1221    /// Chunked loading with `chunk_size=1` must produce correct community assignments.
1222    ///
1223    /// Verifies: (a) community count is correct, (b) `edge_facts_map` and `edge_id_map` are fully
1224    /// populated (checked via community membership — all edges contribute to fingerprints),
1225    /// (c) the loop executes multiple iterations by using a tiny chunk size on a 3-edge graph.
1226    #[tokio::test]
1227    async fn test_detect_communities_chunked_correct_membership() {
1228        let store = setup().await;
1229        let provider = mock_provider();
1230
1231        // Build two isolated clusters: A-B-C and D-E.
1232        let node_alpha = store
1233            .upsert_entity("CA", "CA", EntityType::Concept, None)
1234            .await
1235            .unwrap()
1236            .0;
1237        let node_beta = store
1238            .upsert_entity("CB", "CB", EntityType::Concept, None)
1239            .await
1240            .unwrap()
1241            .0;
1242        let node_gamma = store
1243            .upsert_entity("CC", "CC", EntityType::Concept, None)
1244            .await
1245            .unwrap()
1246            .0;
1247        let node_delta = store
1248            .upsert_entity("CD", "CD", EntityType::Concept, None)
1249            .await
1250            .unwrap()
1251            .0;
1252        let node_epsilon = store
1253            .upsert_entity("CE", "CE", EntityType::Concept, None)
1254            .await
1255            .unwrap()
1256            .0;
1257
1258        store
1259            .insert_edge(node_alpha, node_beta, "r", "A-B fact", 1.0, None)
1260            .await
1261            .unwrap();
1262        store
1263            .insert_edge(node_beta, node_gamma, "r", "B-C fact", 1.0, None)
1264            .await
1265            .unwrap();
1266        store
1267            .insert_edge(node_delta, node_epsilon, "r", "D-E fact", 1.0, None)
1268            .await
1269            .unwrap();
1270
1271        // chunk_size=1: each edge is fetched individually — loop must execute 3 times.
1272        let count_chunked = detect_communities(&store, &provider, usize::MAX, 4, 1)
1273            .await
1274            .unwrap();
1275        assert_eq!(
1276            count_chunked, 2,
1277            "chunked loading must detect both communities"
1278        );
1279
1280        // Verify communities contain the correct members.
1281        let communities = store.all_communities().await.unwrap();
1282        assert_eq!(communities.len(), 2);
1283
1284        let abc_ids = [node_alpha, node_beta, node_gamma];
1285        let de_ids = [node_delta, node_epsilon];
1286        let has_abc = communities.iter().any(|comm| {
1287            abc_ids
1288                .iter()
1289                .all(|id| comm.entity_ids.iter().any(|eid| eid.0 == *id))
1290        });
1291        let has_de = communities.iter().any(|comm| {
1292            de_ids
1293                .iter()
1294                .all(|id| comm.entity_ids.iter().any(|eid| eid.0 == *id))
1295        });
1296        assert!(has_abc, "cluster A-B-C must form a community");
1297        assert!(has_de, "cluster D-E must form a community");
1298    }
1299
1300    /// `chunk_size=usize::MAX` must load all edges in a single query and produce correct results.
1301    #[tokio::test]
1302    async fn test_detect_communities_chunk_size_max() {
1303        let store = setup().await;
1304        let provider = mock_provider();
1305
1306        let x = store
1307            .upsert_entity("MX", "MX", EntityType::Concept, None)
1308            .await
1309            .unwrap()
1310            .0;
1311        let y = store
1312            .upsert_entity("MY", "MY", EntityType::Concept, None)
1313            .await
1314            .unwrap()
1315            .0;
1316        store
1317            .insert_edge(x, y, "r", "X-Y fact", 1.0, None)
1318            .await
1319            .unwrap();
1320
1321        let count = detect_communities(&store, &provider, usize::MAX, 4, usize::MAX)
1322            .await
1323            .unwrap();
1324        assert_eq!(count, 1, "chunk_size=usize::MAX must detect the community");
1325    }
1326
1327    /// `chunk_size=0` falls back to the stream path without panicking.
1328    #[tokio::test]
1329    async fn test_detect_communities_chunk_size_zero_fallback() {
1330        let store = setup().await;
1331        let provider = mock_provider();
1332
1333        let p = store
1334            .upsert_entity("ZP", "ZP", EntityType::Concept, None)
1335            .await
1336            .unwrap()
1337            .0;
1338        let q = store
1339            .upsert_entity("ZQ", "ZQ", EntityType::Concept, None)
1340            .await
1341            .unwrap()
1342            .0;
1343        store
1344            .insert_edge(p, q, "r", "P-Q fact", 1.0, None)
1345            .await
1346            .unwrap();
1347
1348        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
1349            .await
1350            .unwrap();
1351        assert_eq!(
1352            count, 1,
1353            "chunk_size=0 must detect the community via stream fallback"
1354        );
1355    }
1356
1357    /// Verifies that `edge_facts_map` is fully populated during chunked loading by checking
1358    /// that the community fingerprint changes when a new edge is added (fingerprint includes
1359    /// edge IDs, so any missed edges would produce a different or stale fingerprint).
1360    #[tokio::test]
1361    async fn test_detect_communities_chunked_edge_map_complete() {
1362        let store = setup().await;
1363        let (provider, call_buf) = recording_provider();
1364
1365        let a = store
1366            .upsert_entity("FA", "FA", EntityType::Concept, None)
1367            .await
1368            .unwrap()
1369            .0;
1370        let b = store
1371            .upsert_entity("FB", "FB", EntityType::Concept, None)
1372            .await
1373            .unwrap()
1374            .0;
1375        store
1376            .insert_edge(a, b, "r", "edge1 fact", 1.0, None)
1377            .await
1378            .unwrap();
1379
1380        // First detection with chunk_size=1.
1381        detect_communities(&store, &provider, usize::MAX, 4, 1)
1382            .await
1383            .unwrap();
1384        let calls_after_first = call_buf.lock().unwrap().len();
1385        assert_eq!(calls_after_first, 1, "first run must trigger 1 LLM call");
1386
1387        // Add another edge — fingerprint must change, triggering a second LLM call.
1388        store
1389            .insert_edge(b, a, "r2", "edge2 fact", 1.0, None)
1390            .await
1391            .unwrap();
1392
1393        detect_communities(&store, &provider, usize::MAX, 4, 1)
1394            .await
1395            .unwrap();
1396        let calls_after_second = call_buf.lock().unwrap().len();
1397        assert_eq!(
1398            calls_after_second, 2,
1399            "adding an edge must change fingerprint and trigger re-summarization"
1400        );
1401    }
1402
1403    /// `cleanup_stale_entity_embeddings` returns `Ok(0)` when the collection is empty.
1404    #[tokio::test]
1405    async fn cleanup_stale_empty_collection() {
1406        let store = setup().await;
1407        let sqlite_store = crate::store::SqliteStore::new(":memory:").await.unwrap();
1408        let pool = sqlite_store.pool().clone();
1409        let mem_store = Box::new(crate::in_memory_store::InMemoryVectorStore::new());
1410        let emb_store = crate::embedding_store::EmbeddingStore::with_store(mem_store, pool);
1411        emb_store
1412            .ensure_named_collection("zeph_graph_entities", 4)
1413            .await
1414            .unwrap();
1415
1416        let deleted = cleanup_stale_entity_embeddings(&store, &emb_store)
1417            .await
1418            .unwrap();
1419        assert_eq!(deleted, 0, "nothing to delete from empty collection");
1420    }
1421
1422    /// `cleanup_stale_entity_embeddings` deletes the Qdrant point when the `SQLite` entity row
1423    /// has been removed, and leaves live entities untouched.
1424    #[tokio::test]
1425    async fn cleanup_stale_deletes_orphaned_points() {
1426        use crate::graph::types::EntityType;
1427
1428        let sqlite_store = crate::store::SqliteStore::new(":memory:").await.unwrap();
1429        let pool = sqlite_store.pool().clone();
1430        let graph_store = GraphStore::new(pool.clone());
1431
1432        let mem_store = Box::new(crate::in_memory_store::InMemoryVectorStore::new());
1433        let emb_store = crate::embedding_store::EmbeddingStore::with_store(mem_store, pool.clone());
1434        emb_store
1435            .ensure_named_collection("zeph_graph_entities", 4)
1436            .await
1437            .unwrap();
1438
1439        // Insert two entities in SQLite.
1440        let live_id = graph_store
1441            .upsert_entity("Live", "live", EntityType::Person, None)
1442            .await
1443            .unwrap()
1444            .0;
1445        let stale_id = graph_store
1446            .upsert_entity("Stale", "stale", EntityType::Person, None)
1447            .await
1448            .unwrap()
1449            .0;
1450
1451        // Store embeddings with `entity_id_str` for both.
1452        let live_payload = serde_json::json!({
1453            "entity_id": live_id,
1454            "entity_id_str": live_id.to_string(),
1455            "name": "Live",
1456        });
1457        let stale_payload = serde_json::json!({
1458            "entity_id": stale_id,
1459            "entity_id_str": stale_id.to_string(),
1460            "name": "Stale",
1461        });
1462        emb_store
1463            .store_to_collection(
1464                "zeph_graph_entities",
1465                live_payload,
1466                vec![1.0, 0.0, 0.0, 0.0],
1467            )
1468            .await
1469            .unwrap();
1470        emb_store
1471            .store_to_collection(
1472                "zeph_graph_entities",
1473                stale_payload,
1474                vec![0.0, 1.0, 0.0, 0.0],
1475            )
1476            .await
1477            .unwrap();
1478
1479        // Delete the stale entity from SQLite (simulating eviction).
1480        zeph_db::query(zeph_db::sql!("DELETE FROM graph_entities WHERE id = ?"))
1481            .bind(stale_id)
1482            .execute(&pool)
1483            .await
1484            .unwrap();
1485
1486        let deleted = cleanup_stale_entity_embeddings(&graph_store, &emb_store)
1487            .await
1488            .unwrap();
1489        assert_eq!(deleted, 1, "exactly one stale point should be removed");
1490
1491        // The live entity's embedding must remain.
1492        let remaining = emb_store
1493            .scroll_all_entity_ids("zeph_graph_entities")
1494            .await
1495            .unwrap();
1496        assert_eq!(remaining.len(), 1);
1497        assert_eq!(remaining[0].1, live_id);
1498    }
1499}