Skip to main content

zeph_memory/graph/
community.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use futures::TryStreamExt as _;
8use petgraph::Graph;
9use petgraph::graph::NodeIndex;
10use tokio::sync::Semaphore;
11use tokio::task::JoinSet;
12use zeph_llm::LlmProvider as _;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{Message, Role};
15
16use crate::error::MemoryError;
17
18use super::store::GraphStore;
19use super::types::Entity;
20
21const MAX_LABEL_PROPAGATION_ITERATIONS: usize = 50;
22
23/// Strip control characters, Unicode bidi overrides, and zero-width characters from `s`
24/// to prevent prompt injection via entity names or edge facts sourced from untrusted text.
25///
26/// Filtered categories:
27/// - All Unicode control characters (`Cc` category, covers ASCII controls and more)
28/// - Bidi control characters: U+202A–U+202E, U+2066–U+2069
29/// - Zero-width and invisible characters: U+200B–U+200F (includes U+200C, U+200D)
30/// - Byte-order mark: U+FEFF
31fn scrub_content(s: &str) -> String {
32    s.chars()
33        .filter(|c| {
34            !c.is_control()
35                && !matches!(*c as u32,
36                    0x200B..=0x200F | 0x202A..=0x202E | 0x2066..=0x2069 | 0xFEFF
37                )
38        })
39        .collect()
40}
41
42/// Stats returned from graph eviction.
43#[derive(Debug, Default)]
44pub struct GraphEvictionStats {
45    pub expired_edges_deleted: usize,
46    pub orphan_entities_deleted: usize,
47    pub capped_entities_deleted: usize,
48}
49
50/// Truncate `prompt` to at most `max_bytes` at a UTF-8 boundary, appending `"..."`
51/// if truncation occurred.
52///
53/// If `max_bytes` is 0, returns an empty string immediately (disables community summaries).
54/// Otherwise clamps the boundary to the nearest valid UTF-8 char boundary and appends `"..."`.
55fn truncate_prompt(prompt: String, max_bytes: usize) -> String {
56    if max_bytes == 0 {
57        return String::new();
58    }
59    if prompt.len() <= max_bytes {
60        return prompt;
61    }
62    let boundary = prompt.floor_char_boundary(max_bytes);
63    format!("{}...", &prompt[..boundary])
64}
65
66/// Compute a BLAKE3 fingerprint for a community partition.
67///
68/// The fingerprint is derived from sorted entity IDs and sorted intra-community edge IDs,
69/// ensuring both membership and edge mutations trigger re-summarization.
70/// BLAKE3 is used (not `DefaultHasher`) to guarantee determinism across process restarts.
71fn compute_partition_fingerprint(entity_ids: &[i64], intra_edge_ids: &[i64]) -> String {
72    let mut hasher = blake3::Hasher::new();
73    let mut sorted_entities = entity_ids.to_vec();
74    sorted_entities.sort_unstable();
75    hasher.update(b"entities");
76    for id in &sorted_entities {
77        hasher.update(&id.to_le_bytes());
78    }
79    let mut sorted_edges = intra_edge_ids.to_vec();
80    sorted_edges.sort_unstable();
81    hasher.update(b"edges");
82    for id in &sorted_edges {
83        hasher.update(&id.to_le_bytes());
84    }
85    hasher.finalize().to_hex().to_string()
86}
87
88/// Per-community data collected before spawning LLM summarization tasks.
89struct CommunityData {
90    entity_ids: Vec<i64>,
91    entity_names: Vec<String>,
92    intra_facts: Vec<String>,
93    fingerprint: String,
94    name: String,
95}
96
97type UndirectedGraph = Graph<i64, (), petgraph::Undirected>;
98
99async fn build_entity_graph_and_maps(
100    store: &GraphStore,
101    entities: &[Entity],
102    edge_chunk_size: usize,
103) -> Result<
104    (
105        UndirectedGraph,
106        HashMap<(i64, i64), Vec<String>>,
107        HashMap<(i64, i64), Vec<i64>>,
108    ),
109    MemoryError,
110> {
111    let mut graph = UndirectedGraph::new_undirected();
112    let mut node_map: HashMap<i64, NodeIndex> = HashMap::new();
113
114    for entity in entities {
115        let idx = graph.add_node(entity.id);
116        node_map.insert(entity.id, idx);
117    }
118
119    let mut edge_facts_map: HashMap<(i64, i64), Vec<String>> = HashMap::new();
120    let mut edge_id_map: HashMap<(i64, i64), Vec<i64>> = HashMap::new();
121
122    if edge_chunk_size == 0 {
123        let edges: Vec<_> = store.all_active_edges_stream().try_collect().await?;
124        for edge in &edges {
125            if let (Some(&src_idx), Some(&tgt_idx)) = (
126                node_map.get(&edge.source_entity_id),
127                node_map.get(&edge.target_entity_id),
128            ) {
129                graph.add_edge(src_idx, tgt_idx, ());
130            }
131            let key = (edge.source_entity_id, edge.target_entity_id);
132            edge_facts_map
133                .entry(key)
134                .or_default()
135                .push(edge.fact.clone());
136            edge_id_map.entry(key).or_default().push(edge.id);
137        }
138    } else {
139        let limit = i64::try_from(edge_chunk_size).unwrap_or(i64::MAX);
140        let mut last_id: i64 = 0;
141        loop {
142            let chunk = store.edges_after_id(last_id, limit).await?;
143            if chunk.is_empty() {
144                break;
145            }
146            last_id = chunk.last().expect("non-empty chunk has a last element").id;
147            for edge in &chunk {
148                if let (Some(&src_idx), Some(&tgt_idx)) = (
149                    node_map.get(&edge.source_entity_id),
150                    node_map.get(&edge.target_entity_id),
151                ) {
152                    graph.add_edge(src_idx, tgt_idx, ());
153                }
154                let key = (edge.source_entity_id, edge.target_entity_id);
155                edge_facts_map
156                    .entry(key)
157                    .or_default()
158                    .push(edge.fact.clone());
159                edge_id_map.entry(key).or_default().push(edge.id);
160            }
161        }
162    }
163
164    Ok((graph, edge_facts_map, edge_id_map))
165}
166
167fn run_label_propagation(graph: &UndirectedGraph) -> HashMap<usize, Vec<i64>> {
168    let mut labels: Vec<usize> = (0..graph.node_count()).collect();
169
170    for _ in 0..MAX_LABEL_PROPAGATION_ITERATIONS {
171        let mut changed = false;
172        for node_idx in graph.node_indices() {
173            let neighbors: Vec<NodeIndex> = graph.neighbors(node_idx).collect();
174            if neighbors.is_empty() {
175                continue;
176            }
177            let mut freq: HashMap<usize, usize> = HashMap::new();
178            for &nbr in &neighbors {
179                *freq.entry(labels[nbr.index()]).or_insert(0) += 1;
180            }
181            let max_count = *freq.values().max().unwrap_or(&0);
182            let best_label = freq
183                .iter()
184                .filter(|&(_, count)| *count == max_count)
185                .map(|(&label, _)| label)
186                .min()
187                .unwrap_or(labels[node_idx.index()]);
188            if labels[node_idx.index()] != best_label {
189                labels[node_idx.index()] = best_label;
190                changed = true;
191            }
192        }
193        if !changed {
194            break;
195        }
196    }
197
198    let mut communities: HashMap<usize, Vec<i64>> = HashMap::new();
199    for node_idx in graph.node_indices() {
200        let entity_id = graph[node_idx];
201        communities
202            .entry(labels[node_idx.index()])
203            .or_default()
204            .push(entity_id);
205    }
206    communities.retain(|_, members| members.len() >= 2);
207    communities
208}
209
210struct ClassifyResult {
211    to_summarize: Vec<CommunityData>,
212    unchanged_count: usize,
213    new_fingerprints: std::collections::HashSet<String>,
214}
215
216fn classify_communities(
217    communities: &HashMap<usize, Vec<i64>>,
218    edge_facts_map: &HashMap<(i64, i64), Vec<String>>,
219    edge_id_map: &HashMap<(i64, i64), Vec<i64>>,
220    entity_name_map: &HashMap<i64, &str>,
221    stored_fingerprints: &HashMap<String, i64>,
222    sorted_labels: &[usize],
223) -> ClassifyResult {
224    let mut to_summarize: Vec<CommunityData> = Vec::new();
225    let mut unchanged_count = 0usize;
226    let mut new_fingerprints: std::collections::HashSet<String> = std::collections::HashSet::new();
227
228    for (label_index, &label) in sorted_labels.iter().enumerate() {
229        let entity_ids = communities[&label].as_slice();
230        let member_set: std::collections::HashSet<i64> = entity_ids.iter().copied().collect();
231
232        let mut intra_facts: Vec<String> = Vec::new();
233        let mut intra_edge_ids: Vec<i64> = Vec::new();
234        for (&(src, tgt), facts) in edge_facts_map {
235            if member_set.contains(&src) && member_set.contains(&tgt) {
236                intra_facts.extend(facts.iter().map(|f| scrub_content(f)));
237                if let Some(ids) = edge_id_map.get(&(src, tgt)) {
238                    intra_edge_ids.extend_from_slice(ids);
239                }
240            }
241        }
242
243        let fingerprint = compute_partition_fingerprint(entity_ids, &intra_edge_ids);
244        new_fingerprints.insert(fingerprint.clone());
245
246        if stored_fingerprints.contains_key(&fingerprint) {
247            unchanged_count += 1;
248            continue;
249        }
250
251        let entity_names: Vec<String> = entity_ids
252            .iter()
253            .filter_map(|id| entity_name_map.get(id).map(|&s| scrub_content(s)))
254            .collect();
255
256        // Append label_index to prevent ON CONFLICT(name) collisions when two communities
257        // share the same top-3 entity names across detect_communities runs (IC-SIG-02).
258        let base_name = entity_names
259            .iter()
260            .take(3)
261            .cloned()
262            .collect::<Vec<_>>()
263            .join(", ");
264        let name = format!("{base_name} [{label_index}]");
265
266        to_summarize.push(CommunityData {
267            entity_ids: entity_ids.to_vec(),
268            entity_names,
269            intra_facts,
270            fingerprint,
271            name,
272        });
273    }
274
275    ClassifyResult {
276        to_summarize,
277        unchanged_count,
278        new_fingerprints,
279    }
280}
281
282async fn summarize_and_upsert_communities(
283    store: &GraphStore,
284    provider: &AnyProvider,
285    to_summarize: Vec<CommunityData>,
286    concurrency: usize,
287    community_summary_max_prompt_bytes: usize,
288) -> Result<usize, MemoryError> {
289    let semaphore = Arc::new(Semaphore::new(concurrency.max(1)));
290    let mut join_set: JoinSet<(String, String, Vec<i64>, String)> = JoinSet::new();
291
292    for data in to_summarize {
293        let provider = provider.clone();
294        let sem = Arc::clone(&semaphore);
295        let max_bytes = community_summary_max_prompt_bytes;
296        join_set.spawn(async move {
297            let _permit = sem.acquire().await.expect("semaphore is never closed");
298            let summary = match generate_community_summary(
299                &provider,
300                &data.entity_names,
301                &data.intra_facts,
302                max_bytes,
303            )
304            .await
305            {
306                Ok(text) => text,
307                Err(e) => {
308                    tracing::warn!(community = %data.name, "community summary generation failed: {e:#}");
309                    String::new()
310                }
311            };
312            (data.name, summary, data.entity_ids, data.fingerprint)
313        });
314    }
315
316    // Collect results — handle task panics explicitly (HIGH-01 fix).
317    let mut results: Vec<(String, String, Vec<i64>, String)> = Vec::new();
318    while let Some(outcome) = join_set.join_next().await {
319        match outcome {
320            Ok(tuple) => results.push(tuple),
321            Err(e) => {
322                tracing::error!(
323                    panicked = e.is_panic(),
324                    cancelled = e.is_cancelled(),
325                    "community summary task failed"
326                );
327            }
328        }
329    }
330
331    results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
332
333    let mut count = 0usize;
334    for (name, summary, entity_ids, fingerprint) in results {
335        store
336            .upsert_community(&name, &summary, &entity_ids, Some(&fingerprint))
337            .await?;
338        count += 1;
339    }
340
341    Ok(count)
342}
343
344/// Run label propagation on the full entity graph, generate community summaries via LLM,
345/// and upsert results to `SQLite`.
346///
347/// Returns the number of communities detected (with `>= 2` entities).
348///
349/// Unchanged communities (same entity membership and intra-community edges) are skipped —
350/// their existing summaries are preserved without LLM calls (incremental detection, #1262).
351/// LLM calls for changed communities are parallelized via a `JoinSet` bounded by a
352/// semaphore with `concurrency` permits (#1260).
353///
354/// # Panics
355///
356/// Does not panic in normal operation. The `semaphore.acquire().await.expect(...)` call is
357/// infallible because the semaphore is never closed during the lifetime of this function.
358///
359/// # Errors
360///
361/// Returns an error if `SQLite` queries or LLM calls fail.
362pub async fn detect_communities(
363    store: &GraphStore,
364    provider: &AnyProvider,
365    community_summary_max_prompt_bytes: usize,
366    concurrency: usize,
367    edge_chunk_size: usize,
368) -> Result<usize, MemoryError> {
369    let entities = store.all_entities().await?;
370    if entities.len() < 2 {
371        return Ok(0);
372    }
373
374    let (graph, edge_facts_map, edge_id_map) =
375        build_entity_graph_and_maps(store, &entities, edge_chunk_size).await?;
376
377    let communities = run_label_propagation(&graph);
378
379    let entity_name_map: HashMap<i64, &str> =
380        entities.iter().map(|e| (e.id, e.name.as_str())).collect();
381    let stored_fingerprints = store.community_fingerprints().await?;
382
383    let mut sorted_labels: Vec<usize> = communities.keys().copied().collect();
384    sorted_labels.sort_unstable();
385
386    let ClassifyResult {
387        to_summarize,
388        unchanged_count,
389        new_fingerprints,
390    } = classify_communities(
391        &communities,
392        &edge_facts_map,
393        &edge_id_map,
394        &entity_name_map,
395        &stored_fingerprints,
396        &sorted_labels,
397    );
398
399    tracing::debug!(
400        total = sorted_labels.len(),
401        unchanged = unchanged_count,
402        to_summarize = to_summarize.len(),
403        "community detection: partition classification complete"
404    );
405
406    // Delete dissolved communities (fingerprints no longer in new partition set).
407    for (stored_fp, community_id) in &stored_fingerprints {
408        if !new_fingerprints.contains(stored_fp.as_str()) {
409            store.delete_community_by_id(*community_id).await?;
410        }
411    }
412
413    let new_count = summarize_and_upsert_communities(
414        store,
415        provider,
416        to_summarize,
417        concurrency,
418        community_summary_max_prompt_bytes,
419    )
420    .await?;
421
422    Ok(unchanged_count + new_count)
423}
424
425/// Assign a single entity to an existing community via neighbor majority vote.
426///
427/// Returns `Some(community_id)` if assigned, `None` if no neighbors have communities.
428///
429/// When an entity is added, the stored fingerprint is cleared (`NULL`) so the next
430/// `detect_communities` run will re-summarize the affected community (CRIT-02 fix).
431///
432/// # Errors
433///
434/// Returns an error if `SQLite` queries fail.
435pub async fn assign_to_community(
436    store: &GraphStore,
437    entity_id: i64,
438) -> Result<Option<i64>, MemoryError> {
439    let edges = store.edges_for_entity(entity_id).await?;
440    if edges.is_empty() {
441        return Ok(None);
442    }
443
444    let neighbor_ids: Vec<i64> = edges
445        .iter()
446        .map(|e| {
447            if e.source_entity_id == entity_id {
448                e.target_entity_id
449            } else {
450                e.source_entity_id
451            }
452        })
453        .collect();
454
455    let mut community_votes: HashMap<i64, usize> = HashMap::new();
456    for &nbr_id in &neighbor_ids {
457        if let Some(community) = store.community_for_entity(nbr_id).await? {
458            *community_votes.entry(community.id).or_insert(0) += 1;
459        }
460    }
461
462    if community_votes.is_empty() {
463        return Ok(None);
464    }
465
466    // Majority vote — tie-break by smallest community_id.
467    // community_votes is non-empty (checked above), so max_by always returns Some.
468    let Some((&best_community_id, _)) =
469        community_votes
470            .iter()
471            .max_by(|&(&id_a, &count_a), &(&id_b, &count_b)| {
472                count_a.cmp(&count_b).then(id_b.cmp(&id_a))
473            })
474    else {
475        return Ok(None);
476    };
477
478    if let Some(mut target) = store.find_community_by_id(best_community_id).await? {
479        if !target.entity_ids.contains(&entity_id) {
480            target.entity_ids.push(entity_id);
481            store
482                .upsert_community(&target.name, &target.summary, &target.entity_ids, None)
483                .await?;
484            // Clear fingerprint to invalidate cache — next detect_communities will re-summarize.
485            store.clear_community_fingerprint(best_community_id).await?;
486        }
487        return Ok(Some(best_community_id));
488    }
489
490    Ok(None)
491}
492
493/// Remove `Qdrant` points for entities that no longer exist in `SQLite`.
494///
495/// Returns the number of stale points deleted.
496///
497/// # Errors
498///
499/// Returns an error if `Qdrant` operations fail.
500pub async fn cleanup_stale_entity_embeddings(
501    _store: &GraphStore,
502    _embeddings: &crate::embedding_store::EmbeddingStore,
503) -> Result<usize, MemoryError> {
504    // TODO: implement when EmbeddingStore exposes a scroll_all API
505    // (follow-up: add pub async fn scroll_all(&self, collection, key_field) delegating to
506    // self.ops.scroll_all). Then enumerate Qdrant points, collect IDs where entity_id is
507    // not in SQLite, and delete stale points.
508    Ok(0)
509}
510
511/// Run graph eviction: clean expired edges, orphan entities, and cap entity count.
512///
513/// # Errors
514///
515/// Returns an error if `SQLite` queries fail.
516pub async fn run_graph_eviction(
517    store: &GraphStore,
518    expired_edge_retention_days: u32,
519    max_entities: usize,
520) -> Result<GraphEvictionStats, MemoryError> {
521    let expired_edges_deleted = store
522        .delete_expired_edges(expired_edge_retention_days)
523        .await?;
524    let orphan_entities_deleted = store
525        .delete_orphan_entities(expired_edge_retention_days)
526        .await?;
527    let capped_entities_deleted = if max_entities > 0 {
528        store.cap_entities(max_entities).await?
529    } else {
530        0
531    };
532
533    Ok(GraphEvictionStats {
534        expired_edges_deleted,
535        orphan_entities_deleted,
536        capped_entities_deleted,
537    })
538}
539
540async fn generate_community_summary(
541    provider: &AnyProvider,
542    entity_names: &[String],
543    edge_facts: &[String],
544    max_prompt_bytes: usize,
545) -> Result<String, MemoryError> {
546    let entities_str = entity_names.join(", ");
547    // Cap facts at 20 to bound prompt size; data is already scrubbed upstream.
548    let facts_str = edge_facts
549        .iter()
550        .take(20)
551        .map(|f| format!("- {f}"))
552        .collect::<Vec<_>>()
553        .join("\n");
554
555    let raw_prompt = format!(
556        "Summarize the following group of related entities and their relationships \
557         into a single paragraph (2-3 sentences). Focus on the theme that connects \
558         them and the key relationships.\n\nEntities: {entities_str}\n\
559         Relationships:\n{facts_str}\n\nSummary:"
560    );
561
562    let original_bytes = raw_prompt.len();
563    let truncated = raw_prompt.len() > max_prompt_bytes;
564    let prompt = truncate_prompt(raw_prompt, max_prompt_bytes);
565    if prompt.is_empty() {
566        return Ok(String::new());
567    }
568    if truncated {
569        tracing::warn!(
570            entity_count = entity_names.len(),
571            original_bytes,
572            truncated_bytes = prompt.len(),
573            "community summary prompt truncated"
574        );
575    }
576
577    let messages = [Message::from_legacy(Role::User, prompt)];
578    let response: String = provider.chat(&messages).await.map_err(MemoryError::Llm)?;
579    Ok(response)
580}
581
582#[cfg(test)]
583mod tests {
584    use std::sync::{Arc, Mutex};
585
586    use super::*;
587    use crate::graph::types::EntityType;
588    use crate::sqlite::SqliteStore;
589
590    async fn setup() -> GraphStore {
591        let store = SqliteStore::new(":memory:").await.unwrap();
592        GraphStore::new(store.pool().clone())
593    }
594
595    fn mock_provider() -> AnyProvider {
596        AnyProvider::Mock(zeph_llm::mock::MockProvider::default())
597    }
598
599    fn recording_provider() -> (
600        AnyProvider,
601        Arc<Mutex<Vec<Vec<zeph_llm::provider::Message>>>>,
602    ) {
603        let (mock, buf) = zeph_llm::mock::MockProvider::default().with_recording();
604        (AnyProvider::Mock(mock), buf)
605    }
606
607    #[tokio::test]
608    async fn test_detect_communities_empty_graph() {
609        let store = setup().await;
610        let provider = mock_provider();
611        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
612            .await
613            .unwrap();
614        assert_eq!(count, 0);
615    }
616
617    #[tokio::test]
618    async fn test_detect_communities_single_entity() {
619        let store = setup().await;
620        let provider = mock_provider();
621        store
622            .upsert_entity("Solo", "Solo", EntityType::Concept, None)
623            .await
624            .unwrap();
625        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
626            .await
627            .unwrap();
628        assert_eq!(count, 0, "single isolated entity must not form a community");
629    }
630
631    #[tokio::test]
632    async fn test_single_entity_community_filtered() {
633        let store = setup().await;
634        let provider = mock_provider();
635
636        // Create 3 connected entities (cluster A) and 1 isolated entity.
637        let a = store
638            .upsert_entity("A", "A", EntityType::Concept, None)
639            .await
640            .unwrap();
641        let b = store
642            .upsert_entity("B", "B", EntityType::Concept, None)
643            .await
644            .unwrap();
645        let c = store
646            .upsert_entity("C", "C", EntityType::Concept, None)
647            .await
648            .unwrap();
649        let iso = store
650            .upsert_entity("Isolated", "Isolated", EntityType::Concept, None)
651            .await
652            .unwrap();
653
654        store
655            .insert_edge(a, b, "r", "A relates B", 1.0, None)
656            .await
657            .unwrap();
658        store
659            .insert_edge(b, c, "r", "B relates C", 1.0, None)
660            .await
661            .unwrap();
662
663        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
664            .await
665            .unwrap();
666        // Isolated entity has no edges — must NOT be persisted as a community.
667        assert_eq!(count, 1, "only the 3-entity cluster should be detected");
668
669        let communities = store.all_communities().await.unwrap();
670        assert_eq!(communities.len(), 1);
671        assert!(
672            !communities[0].entity_ids.contains(&iso),
673            "isolated entity must not be in any community"
674        );
675    }
676
677    #[tokio::test]
678    async fn test_label_propagation_basic() {
679        let store = setup().await;
680        let provider = mock_provider();
681
682        // Create 4 clusters of 3 entities each (12 entities total), fully isolated.
683        let mut cluster_ids: Vec<Vec<i64>> = Vec::new();
684        for cluster in 0..4_i64 {
685            let mut ids = Vec::new();
686            for node in 0..3_i64 {
687                let name = format!("c{cluster}_n{node}");
688                let id = store
689                    .upsert_entity(&name, &name, EntityType::Concept, None)
690                    .await
691                    .unwrap();
692                ids.push(id);
693            }
694            // Connect nodes within cluster (chain: 0-1-2).
695            store
696                .insert_edge(ids[0], ids[1], "r", "f", 1.0, None)
697                .await
698                .unwrap();
699            store
700                .insert_edge(ids[1], ids[2], "r", "f", 1.0, None)
701                .await
702                .unwrap();
703            cluster_ids.push(ids);
704        }
705
706        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
707            .await
708            .unwrap();
709        assert_eq!(count, 4, "expected 4 communities, one per cluster");
710
711        let communities = store.all_communities().await.unwrap();
712        assert_eq!(communities.len(), 4);
713
714        // Each cluster's entity IDs must appear in exactly one community.
715        for ids in &cluster_ids {
716            let found = communities
717                .iter()
718                .filter(|c| ids.iter().any(|id| c.entity_ids.contains(id)))
719                .count();
720            assert_eq!(
721                found, 1,
722                "all nodes of a cluster must be in the same community"
723            );
724        }
725    }
726
727    #[tokio::test]
728    async fn test_all_isolated_nodes() {
729        let store = setup().await;
730        let provider = mock_provider();
731
732        // Insert 5 entities with no edges at all.
733        for i in 0..5_i64 {
734            store
735                .upsert_entity(
736                    &format!("iso_{i}"),
737                    &format!("iso_{i}"),
738                    EntityType::Concept,
739                    None,
740                )
741                .await
742                .unwrap();
743        }
744
745        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
746            .await
747            .unwrap();
748        assert_eq!(count, 0, "zero-edge graph must produce no communities");
749        assert_eq!(store.community_count().await.unwrap(), 0);
750    }
751
752    #[tokio::test]
753    async fn test_eviction_expired_edges() {
754        let store = setup().await;
755
756        let a = store
757            .upsert_entity("EA", "EA", EntityType::Concept, None)
758            .await
759            .unwrap();
760        let b = store
761            .upsert_entity("EB", "EB", EntityType::Concept, None)
762            .await
763            .unwrap();
764        let edge_id = store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
765        store.invalidate_edge(edge_id).await.unwrap();
766
767        // Manually set expired_at to a date far in the past to trigger deletion.
768        sqlx::query(
769            "UPDATE graph_edges SET expired_at = datetime('now', '-200 days') WHERE id = ?1",
770        )
771        .bind(edge_id)
772        .execute(store.pool())
773        .await
774        .unwrap();
775
776        let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
777        assert_eq!(stats.expired_edges_deleted, 1);
778    }
779
780    #[tokio::test]
781    async fn test_eviction_orphan_entities() {
782        let store = setup().await;
783
784        let iso = store
785            .upsert_entity("Orphan", "Orphan", EntityType::Concept, None)
786            .await
787            .unwrap();
788
789        // Set last_seen_at to far in the past.
790        sqlx::query(
791            "UPDATE graph_entities SET last_seen_at = datetime('now', '-200 days') WHERE id = ?1",
792        )
793        .bind(iso)
794        .execute(store.pool())
795        .await
796        .unwrap();
797
798        let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
799        assert_eq!(stats.orphan_entities_deleted, 1);
800    }
801
802    #[tokio::test]
803    async fn test_eviction_entity_cap() {
804        let store = setup().await;
805
806        // Insert 5 entities with no edges (so they can be capped).
807        for i in 0..5_i64 {
808            let name = format!("cap_entity_{i}");
809            store
810                .upsert_entity(&name, &name, EntityType::Concept, None)
811                .await
812                .unwrap();
813        }
814
815        let stats = run_graph_eviction(&store, 90, 3).await.unwrap();
816        assert_eq!(
817            stats.capped_entities_deleted, 2,
818            "should delete 5-3=2 entities"
819        );
820        assert_eq!(store.entity_count().await.unwrap(), 3);
821    }
822
823    #[tokio::test]
824    async fn test_assign_to_community_no_neighbors() {
825        let store = setup().await;
826        let entity_id = store
827            .upsert_entity("Loner", "Loner", EntityType::Concept, None)
828            .await
829            .unwrap();
830
831        let result = assign_to_community(&store, entity_id).await.unwrap();
832        assert!(result.is_none());
833    }
834
835    #[tokio::test]
836    async fn test_extraction_count_persistence() {
837        use tempfile::NamedTempFile;
838        // Create a real on-disk SQLite DB to verify persistence across store instances.
839        let tmp = NamedTempFile::new().unwrap();
840        let path = tmp.path().to_str().unwrap().to_owned();
841
842        let store1 = {
843            let s = crate::sqlite::SqliteStore::new(&path).await.unwrap();
844            GraphStore::new(s.pool().clone())
845        };
846
847        store1.set_metadata("extraction_count", "0").await.unwrap();
848        for i in 1..=5_i64 {
849            store1
850                .set_metadata("extraction_count", &i.to_string())
851                .await
852                .unwrap();
853        }
854
855        // Open a second handle to the same file and verify the value persists.
856        let store2 = {
857            let s = crate::sqlite::SqliteStore::new(&path).await.unwrap();
858            GraphStore::new(s.pool().clone())
859        };
860        assert_eq!(store2.extraction_count().await.unwrap(), 5);
861    }
862
863    #[test]
864    fn test_scrub_content_ascii_control() {
865        // Newline, carriage return, null byte, tab (all ASCII control chars) must be stripped.
866        let input = "hello\nworld\r\x00\x01\x09end";
867        assert_eq!(scrub_content(input), "helloworldend");
868    }
869
870    #[test]
871    fn test_scrub_content_bidi_overrides() {
872        // U+202A LEFT-TO-RIGHT EMBEDDING, U+202E RIGHT-TO-LEFT OVERRIDE,
873        // U+2066 LEFT-TO-RIGHT ISOLATE, U+2069 POP DIRECTIONAL ISOLATE.
874        let input = "safe\u{202A}inject\u{202E}end\u{2066}iso\u{2069}done".to_string();
875        assert_eq!(scrub_content(&input), "safeinjectendisodone");
876    }
877
878    #[test]
879    fn test_scrub_content_zero_width() {
880        // U+200B ZERO WIDTH SPACE, U+200C ZERO WIDTH NON-JOINER, U+200D ZERO WIDTH JOINER,
881        // U+200F RIGHT-TO-LEFT MARK.
882        let input = "a\u{200B}b\u{200C}c\u{200D}d\u{200F}e".to_string();
883        assert_eq!(scrub_content(&input), "abcde");
884    }
885
886    #[test]
887    fn test_scrub_content_bom() {
888        // U+FEFF BYTE ORDER MARK must be stripped.
889        let input = "\u{FEFF}hello".to_string();
890        assert_eq!(scrub_content(&input), "hello");
891    }
892
893    #[test]
894    fn test_scrub_content_clean_string_unchanged() {
895        let input = "Hello, World! 123 — normal text.";
896        assert_eq!(scrub_content(input), input);
897    }
898
899    #[test]
900    fn test_truncate_prompt_within_limit() {
901        let result = truncate_prompt("short".into(), 100);
902        assert_eq!(result, "short");
903    }
904
905    #[test]
906    fn test_truncate_prompt_zero_max_bytes() {
907        let result = truncate_prompt("hello".into(), 0);
908        assert_eq!(result, "");
909    }
910
911    #[test]
912    fn test_truncate_prompt_long_facts() {
913        let facts: Vec<String> = (0..20)
914            .map(|i| format!("fact_{i}_{}", "x".repeat(20)))
915            .collect();
916        let prompt = facts.join("\n");
917        let result = truncate_prompt(prompt, 200);
918        assert!(
919            result.ends_with("..."),
920            "truncated prompt must end with '...'"
921        );
922        // byte length must be at most max_bytes + 3 (the "..." suffix)
923        assert!(result.len() <= 203);
924        assert!(std::str::from_utf8(result.as_bytes()).is_ok());
925    }
926
927    #[test]
928    fn test_truncate_prompt_utf8_boundary() {
929        // Each '🔥' is 4 bytes; 100 emojis = 400 bytes.
930        let prompt = "🔥".repeat(100);
931        let result = truncate_prompt(prompt, 10);
932        assert!(
933            result.ends_with("..."),
934            "truncated prompt must end with '...'"
935        );
936        // floor_char_boundary(10) for 4-byte chars lands at 8 (2 full emojis = 8 bytes)
937        assert_eq!(result.len(), 8 + 3, "2 emojis (8 bytes) + '...' (3 bytes)");
938        assert!(std::str::from_utf8(result.as_bytes()).is_ok());
939    }
940
941    #[tokio::test]
942    async fn test_assign_to_community_majority_vote() {
943        let store = setup().await;
944
945        // Setup: community C1 with members [A, B], then add D with edges to both A and B.
946        let a = store
947            .upsert_entity("AA", "AA", EntityType::Concept, None)
948            .await
949            .unwrap();
950        let b = store
951            .upsert_entity("BB", "BB", EntityType::Concept, None)
952            .await
953            .unwrap();
954        let d = store
955            .upsert_entity("DD", "DD", EntityType::Concept, None)
956            .await
957            .unwrap();
958
959        store
960            .upsert_community("test_community", "summary", &[a, b], None)
961            .await
962            .unwrap();
963
964        store.insert_edge(d, a, "r", "f", 1.0, None).await.unwrap();
965        store.insert_edge(d, b, "r", "f", 1.0, None).await.unwrap();
966
967        let result = assign_to_community(&store, d).await.unwrap();
968        assert!(result.is_some());
969
970        // The returned ID must be valid for subsequent lookups (HIGH-IC-01 regression test).
971        let returned_id = result.unwrap();
972        let community = store
973            .find_community_by_id(returned_id)
974            .await
975            .unwrap()
976            .expect("returned community_id must reference an existing row");
977        assert!(
978            community.entity_ids.contains(&d),
979            "D should be added to the community"
980        );
981        // Fingerprint must be NULL after assign (cache invalidated for next detect run).
982        assert!(
983            community.fingerprint.is_none(),
984            "fingerprint must be cleared after assign_to_community"
985        );
986    }
987
988    /// #1262: Second `detect_communities` call with no graph changes must produce 0 LLM calls.
989    #[tokio::test]
990    async fn test_incremental_detection_no_changes_skips_llm() {
991        let store = setup().await;
992        let (provider, call_buf) = recording_provider();
993
994        let a = store
995            .upsert_entity("X", "X", EntityType::Concept, None)
996            .await
997            .unwrap();
998        let b = store
999            .upsert_entity("Y", "Y", EntityType::Concept, None)
1000            .await
1001            .unwrap();
1002        store
1003            .insert_edge(a, b, "r", "X relates Y", 1.0, None)
1004            .await
1005            .unwrap();
1006
1007        // First run: LLM called once to summarize the community.
1008        detect_communities(&store, &provider, usize::MAX, 4, 0)
1009            .await
1010            .unwrap();
1011        let first_calls = call_buf.lock().unwrap().len();
1012        assert_eq!(first_calls, 1, "first run must produce exactly 1 LLM call");
1013
1014        // Second run: graph unchanged — 0 LLM calls.
1015        detect_communities(&store, &provider, usize::MAX, 4, 0)
1016            .await
1017            .unwrap();
1018        let second_calls = call_buf.lock().unwrap().len();
1019        assert_eq!(
1020            second_calls, first_calls,
1021            "second run with no graph changes must produce 0 additional LLM calls"
1022        );
1023    }
1024
1025    /// #1262: Adding an edge changes the fingerprint — LLM must be called again.
1026    #[tokio::test]
1027    async fn test_incremental_detection_edge_change_triggers_resummary() {
1028        let store = setup().await;
1029        let (provider, call_buf) = recording_provider();
1030
1031        let a = store
1032            .upsert_entity("P", "P", EntityType::Concept, None)
1033            .await
1034            .unwrap();
1035        let b = store
1036            .upsert_entity("Q", "Q", EntityType::Concept, None)
1037            .await
1038            .unwrap();
1039        store
1040            .insert_edge(a, b, "r", "P relates Q", 1.0, None)
1041            .await
1042            .unwrap();
1043
1044        detect_communities(&store, &provider, usize::MAX, 4, 0)
1045            .await
1046            .unwrap();
1047        let after_first = call_buf.lock().unwrap().len();
1048        assert_eq!(after_first, 1);
1049
1050        // Add a new edge within the community to change its fingerprint.
1051        store
1052            .insert_edge(b, a, "r2", "Q also relates P", 1.0, None)
1053            .await
1054            .unwrap();
1055
1056        detect_communities(&store, &provider, usize::MAX, 4, 0)
1057            .await
1058            .unwrap();
1059        let after_second = call_buf.lock().unwrap().len();
1060        assert_eq!(
1061            after_second, 2,
1062            "edge change must trigger one additional LLM call"
1063        );
1064    }
1065
1066    /// #1262: Communities whose fingerprints vanish are deleted on refresh.
1067    #[tokio::test]
1068    async fn test_incremental_detection_dissolved_community_deleted() {
1069        let store = setup().await;
1070        let provider = mock_provider();
1071
1072        let a = store
1073            .upsert_entity("M1", "M1", EntityType::Concept, None)
1074            .await
1075            .unwrap();
1076        let b = store
1077            .upsert_entity("M2", "M2", EntityType::Concept, None)
1078            .await
1079            .unwrap();
1080        let edge_id = store
1081            .insert_edge(a, b, "r", "M1 relates M2", 1.0, None)
1082            .await
1083            .unwrap();
1084
1085        detect_communities(&store, &provider, usize::MAX, 4, 0)
1086            .await
1087            .unwrap();
1088        assert_eq!(store.community_count().await.unwrap(), 1);
1089
1090        // Invalidate the edge — community dissolves.
1091        store.invalidate_edge(edge_id).await.unwrap();
1092
1093        detect_communities(&store, &provider, usize::MAX, 4, 0)
1094            .await
1095            .unwrap();
1096        assert_eq!(
1097            store.community_count().await.unwrap(),
1098            0,
1099            "dissolved community must be deleted on next refresh"
1100        );
1101    }
1102
1103    /// #1260: Sequential fallback (concurrency=1) produces correct results.
1104    #[tokio::test]
1105    async fn test_detect_communities_concurrency_one() {
1106        let store = setup().await;
1107        let provider = mock_provider();
1108
1109        let a = store
1110            .upsert_entity("C1A", "C1A", EntityType::Concept, None)
1111            .await
1112            .unwrap();
1113        let b = store
1114            .upsert_entity("C1B", "C1B", EntityType::Concept, None)
1115            .await
1116            .unwrap();
1117        store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
1118
1119        let count = detect_communities(&store, &provider, usize::MAX, 1, 0)
1120            .await
1121            .unwrap();
1122        assert_eq!(count, 1, "concurrency=1 must still detect the community");
1123        assert_eq!(store.community_count().await.unwrap(), 1);
1124    }
1125
1126    #[test]
1127    fn test_compute_fingerprint_deterministic() {
1128        let fp1 = compute_partition_fingerprint(&[1, 2, 3], &[10, 20]);
1129        let fp2 = compute_partition_fingerprint(&[3, 1, 2], &[20, 10]);
1130        assert_eq!(fp1, fp2, "fingerprint must be order-independent");
1131
1132        let fp3 = compute_partition_fingerprint(&[1, 2, 3], &[10, 30]);
1133        assert_ne!(
1134            fp1, fp3,
1135            "different edge IDs must produce different fingerprint"
1136        );
1137
1138        let fp4 = compute_partition_fingerprint(&[1, 2, 4], &[10, 20]);
1139        assert_ne!(
1140            fp1, fp4,
1141            "different entity IDs must produce different fingerprint"
1142        );
1143    }
1144
1145    /// Domain separator test: entity/edge sequences with same raw bytes must not collide.
1146    ///
1147    /// Without domain separators, entities=[1,2] edges=[3] would hash identically to
1148    /// entities=[1] edges=[2,3] (same concatenated `le_bytes`). With separators they differ.
1149    #[test]
1150    fn test_compute_fingerprint_domain_separation() {
1151        let fp_a = compute_partition_fingerprint(&[1, 2], &[3]);
1152        let fp_b = compute_partition_fingerprint(&[1], &[2, 3]);
1153        assert_ne!(
1154            fp_a, fp_b,
1155            "entity/edge sequences with same raw bytes must produce different fingerprints"
1156        );
1157    }
1158
1159    /// Chunked loading with chunk_size=1 must produce correct community assignments.
1160    ///
1161    /// Verifies: (a) community count is correct, (b) edge_facts_map and edge_id_map are fully
1162    /// populated (checked via community membership — all edges contribute to fingerprints),
1163    /// (c) the loop executes multiple iterations by using a tiny chunk size on a 3-edge graph.
1164    #[tokio::test]
1165    async fn test_detect_communities_chunked_correct_membership() {
1166        let store = setup().await;
1167        let provider = mock_provider();
1168
1169        // Build two isolated clusters: A-B-C and D-E.
1170        let a = store
1171            .upsert_entity("CA", "CA", EntityType::Concept, None)
1172            .await
1173            .unwrap();
1174        let b = store
1175            .upsert_entity("CB", "CB", EntityType::Concept, None)
1176            .await
1177            .unwrap();
1178        let c = store
1179            .upsert_entity("CC", "CC", EntityType::Concept, None)
1180            .await
1181            .unwrap();
1182        let d = store
1183            .upsert_entity("CD", "CD", EntityType::Concept, None)
1184            .await
1185            .unwrap();
1186        let e = store
1187            .upsert_entity("CE", "CE", EntityType::Concept, None)
1188            .await
1189            .unwrap();
1190
1191        store
1192            .insert_edge(a, b, "r", "A-B fact", 1.0, None)
1193            .await
1194            .unwrap();
1195        store
1196            .insert_edge(b, c, "r", "B-C fact", 1.0, None)
1197            .await
1198            .unwrap();
1199        store
1200            .insert_edge(d, e, "r", "D-E fact", 1.0, None)
1201            .await
1202            .unwrap();
1203
1204        // chunk_size=1: each edge is fetched individually — loop must execute 3 times.
1205        let count_chunked = detect_communities(&store, &provider, usize::MAX, 4, 1)
1206            .await
1207            .unwrap();
1208        assert_eq!(
1209            count_chunked, 2,
1210            "chunked loading must detect both communities"
1211        );
1212
1213        // Verify communities contain the correct members.
1214        let communities = store.all_communities().await.unwrap();
1215        assert_eq!(communities.len(), 2);
1216
1217        let abc_ids = [a, b, c];
1218        let de_ids = [d, e];
1219        let has_abc = communities
1220            .iter()
1221            .any(|c| abc_ids.iter().all(|id| c.entity_ids.contains(id)));
1222        let has_de = communities
1223            .iter()
1224            .any(|c| de_ids.iter().all(|id| c.entity_ids.contains(id)));
1225        assert!(has_abc, "cluster A-B-C must form a community");
1226        assert!(has_de, "cluster D-E must form a community");
1227    }
1228
1229    /// chunk_size=usize::MAX must load all edges in a single query and produce correct results.
1230    #[tokio::test]
1231    async fn test_detect_communities_chunk_size_max() {
1232        let store = setup().await;
1233        let provider = mock_provider();
1234
1235        let x = store
1236            .upsert_entity("MX", "MX", EntityType::Concept, None)
1237            .await
1238            .unwrap();
1239        let y = store
1240            .upsert_entity("MY", "MY", EntityType::Concept, None)
1241            .await
1242            .unwrap();
1243        store
1244            .insert_edge(x, y, "r", "X-Y fact", 1.0, None)
1245            .await
1246            .unwrap();
1247
1248        let count = detect_communities(&store, &provider, usize::MAX, 4, usize::MAX)
1249            .await
1250            .unwrap();
1251        assert_eq!(count, 1, "chunk_size=usize::MAX must detect the community");
1252    }
1253
1254    /// chunk_size=0 falls back to the stream path without panicking.
1255    #[tokio::test]
1256    async fn test_detect_communities_chunk_size_zero_fallback() {
1257        let store = setup().await;
1258        let provider = mock_provider();
1259
1260        let p = store
1261            .upsert_entity("ZP", "ZP", EntityType::Concept, None)
1262            .await
1263            .unwrap();
1264        let q = store
1265            .upsert_entity("ZQ", "ZQ", EntityType::Concept, None)
1266            .await
1267            .unwrap();
1268        store
1269            .insert_edge(p, q, "r", "P-Q fact", 1.0, None)
1270            .await
1271            .unwrap();
1272
1273        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
1274            .await
1275            .unwrap();
1276        assert_eq!(
1277            count, 1,
1278            "chunk_size=0 must detect the community via stream fallback"
1279        );
1280    }
1281
1282    /// Verifies that edge_facts_map is fully populated during chunked loading by checking
1283    /// that the community fingerprint changes when a new edge is added (fingerprint includes
1284    /// edge IDs, so any missed edges would produce a different or stale fingerprint).
1285    #[tokio::test]
1286    async fn test_detect_communities_chunked_edge_map_complete() {
1287        let store = setup().await;
1288        let (provider, call_buf) = recording_provider();
1289
1290        let a = store
1291            .upsert_entity("FA", "FA", EntityType::Concept, None)
1292            .await
1293            .unwrap();
1294        let b = store
1295            .upsert_entity("FB", "FB", EntityType::Concept, None)
1296            .await
1297            .unwrap();
1298        store
1299            .insert_edge(a, b, "r", "edge1 fact", 1.0, None)
1300            .await
1301            .unwrap();
1302
1303        // First detection with chunk_size=1.
1304        detect_communities(&store, &provider, usize::MAX, 4, 1)
1305            .await
1306            .unwrap();
1307        let calls_after_first = call_buf.lock().unwrap().len();
1308        assert_eq!(calls_after_first, 1, "first run must trigger 1 LLM call");
1309
1310        // Add another edge — fingerprint must change, triggering a second LLM call.
1311        store
1312            .insert_edge(b, a, "r2", "edge2 fact", 1.0, None)
1313            .await
1314            .unwrap();
1315
1316        detect_communities(&store, &provider, usize::MAX, 4, 1)
1317            .await
1318            .unwrap();
1319        let calls_after_second = call_buf.lock().unwrap().len();
1320        assert_eq!(
1321            calls_after_second, 2,
1322            "adding an edge must change fingerprint and trigger re-summarization"
1323        );
1324    }
1325}