Skip to main content

zeph_memory/graph/
community.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use futures::TryStreamExt as _;
8use petgraph::Graph;
9use petgraph::graph::NodeIndex;
10use tokio::sync::Semaphore;
11use tokio::task::JoinSet;
12use zeph_llm::LlmProvider as _;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{Message, Role};
15
16use crate::error::MemoryError;
17
18use super::store::GraphStore;
19
20const MAX_LABEL_PROPAGATION_ITERATIONS: usize = 50;
21
22/// Strip control characters, Unicode bidi overrides, and zero-width characters from `s`
23/// to prevent prompt injection via entity names or edge facts sourced from untrusted text.
24///
25/// Filtered categories:
26/// - All Unicode control characters (`Cc` category, covers ASCII controls and more)
27/// - Bidi control characters: U+202A–U+202E, U+2066–U+2069
28/// - Zero-width and invisible characters: U+200B–U+200F (includes U+200C, U+200D)
29/// - Byte-order mark: U+FEFF
30fn scrub_content(s: &str) -> String {
31    s.chars()
32        .filter(|c| {
33            !c.is_control()
34                && !matches!(*c as u32,
35                    0x200B..=0x200F | 0x202A..=0x202E | 0x2066..=0x2069 | 0xFEFF
36                )
37        })
38        .collect()
39}
40
41/// Stats returned from graph eviction.
42#[derive(Debug, Default)]
43pub struct GraphEvictionStats {
44    pub expired_edges_deleted: usize,
45    pub orphan_entities_deleted: usize,
46    pub capped_entities_deleted: usize,
47}
48
49/// Truncate `prompt` to at most `max_bytes` at a UTF-8 boundary, appending `"..."`
50/// if truncation occurred.
51///
52/// If `max_bytes` is 0, returns an empty string immediately (disables community summaries).
53/// Otherwise clamps the boundary to the nearest valid UTF-8 char boundary and appends `"..."`.
54fn truncate_prompt(prompt: String, max_bytes: usize) -> String {
55    if max_bytes == 0 {
56        return String::new();
57    }
58    if prompt.len() <= max_bytes {
59        return prompt;
60    }
61    let boundary = prompt.floor_char_boundary(max_bytes);
62    format!("{}...", &prompt[..boundary])
63}
64
65/// Compute a BLAKE3 fingerprint for a community partition.
66///
67/// The fingerprint is derived from sorted entity IDs and sorted intra-community edge IDs,
68/// ensuring both membership and edge mutations trigger re-summarization.
69/// BLAKE3 is used (not `DefaultHasher`) to guarantee determinism across process restarts.
70fn compute_partition_fingerprint(entity_ids: &[i64], intra_edge_ids: &[i64]) -> String {
71    let mut hasher = blake3::Hasher::new();
72    let mut sorted_entities = entity_ids.to_vec();
73    sorted_entities.sort_unstable();
74    hasher.update(b"entities");
75    for id in &sorted_entities {
76        hasher.update(&id.to_le_bytes());
77    }
78    let mut sorted_edges = intra_edge_ids.to_vec();
79    sorted_edges.sort_unstable();
80    hasher.update(b"edges");
81    for id in &sorted_edges {
82        hasher.update(&id.to_le_bytes());
83    }
84    hasher.finalize().to_hex().to_string()
85}
86
87/// Per-community data collected before spawning LLM summarization tasks.
88struct CommunityData {
89    entity_ids: Vec<i64>,
90    entity_names: Vec<String>,
91    intra_facts: Vec<String>,
92    fingerprint: String,
93    name: String,
94}
95
96/// Run label propagation on the full entity graph, generate community summaries via LLM,
97/// and upsert results to `SQLite`.
98///
99/// Returns the number of communities detected (with `>= 2` entities).
100///
101/// Unchanged communities (same entity membership and intra-community edges) are skipped —
102/// their existing summaries are preserved without LLM calls (incremental detection, #1262).
103/// LLM calls for changed communities are parallelized via a `JoinSet` bounded by a
104/// semaphore with `concurrency` permits (#1260).
105///
106/// # Panics
107///
108/// Does not panic in normal operation. The `semaphore.acquire().await.expect(...)` call is
109/// infallible because the semaphore is never closed during the lifetime of this function.
110///
111/// # Errors
112///
113/// Returns an error if `SQLite` queries or LLM calls fail.
114#[allow(clippy::too_many_lines)]
115pub async fn detect_communities(
116    store: &GraphStore,
117    provider: &AnyProvider,
118    community_summary_max_prompt_bytes: usize,
119    concurrency: usize,
120    edge_chunk_size: usize,
121) -> Result<usize, MemoryError> {
122    let entities = store.all_entities().await?;
123    if entities.len() < 2 {
124        return Ok(0);
125    }
126
127    // Build undirected graph: node weight = entity_id, no edge weight.
128    // Tie-breaking in label propagation is deterministic for a given dataset
129    // (labels are NodeIndex values assigned in ORDER BY id ASC order), but may
130    // vary if entity IDs change after deletion/re-insertion.
131    let mut graph = Graph::<i64, (), petgraph::Undirected>::new_undirected();
132    let mut node_map: HashMap<i64, NodeIndex> = HashMap::new();
133
134    for entity in &entities {
135        let idx = graph.add_node(entity.id);
136        node_map.insert(entity.id, idx);
137    }
138
139    // Build edge fact/id lookup maps alongside the petgraph, consuming edges in chunks.
140    // Each chunk is fetched and discarded, eliminating the need to hold a full Vec<Edge>.
141    // The true peak memory saving is in the Edge struct overhead (~130-180 bytes per edge
142    // beyond chunk_size): fact strings are still fully materialized in edge_facts_map.
143    let mut edge_facts_map: HashMap<(i64, i64), Vec<String>> = HashMap::new();
144    let mut edge_id_map: HashMap<(i64, i64), Vec<i64>> = HashMap::new();
145
146    if edge_chunk_size == 0 {
147        // chunk_size=0: fall back to streaming all edges at once (legacy path).
148        let edges: Vec<_> = store.all_active_edges_stream().try_collect().await?;
149        for edge in &edges {
150            if let (Some(&src_idx), Some(&tgt_idx)) = (
151                node_map.get(&edge.source_entity_id),
152                node_map.get(&edge.target_entity_id),
153            ) {
154                graph.add_edge(src_idx, tgt_idx, ());
155            }
156            let key = (edge.source_entity_id, edge.target_entity_id);
157            edge_facts_map
158                .entry(key)
159                .or_default()
160                .push(edge.fact.clone());
161            edge_id_map.entry(key).or_default().push(edge.id);
162        }
163    } else {
164        let limit = i64::try_from(edge_chunk_size).unwrap_or(i64::MAX);
165        let mut last_id: i64 = 0;
166        loop {
167            let chunk = store.edges_after_id(last_id, limit).await?;
168            if chunk.is_empty() {
169                break;
170            }
171            // Safe: chunk is non-empty (checked above).
172            last_id = chunk.last().expect("non-empty chunk has a last element").id;
173            for edge in &chunk {
174                if let (Some(&src_idx), Some(&tgt_idx)) = (
175                    node_map.get(&edge.source_entity_id),
176                    node_map.get(&edge.target_entity_id),
177                ) {
178                    graph.add_edge(src_idx, tgt_idx, ());
179                }
180                let key = (edge.source_entity_id, edge.target_entity_id);
181                edge_facts_map
182                    .entry(key)
183                    .or_default()
184                    .push(edge.fact.clone());
185                edge_id_map.entry(key).or_default().push(edge.id);
186            }
187            // chunk is dropped here; only edge_facts_map and edge_id_map retain edge data.
188        }
189    }
190
191    // Label propagation: each node starts with its own NodeIndex as label.
192    let mut labels: Vec<usize> = (0..graph.node_count()).collect();
193
194    for _ in 0..MAX_LABEL_PROPAGATION_ITERATIONS {
195        let mut changed = false;
196        for node_idx in graph.node_indices() {
197            let neighbors: Vec<NodeIndex> = graph.neighbors(node_idx).collect();
198            if neighbors.is_empty() {
199                continue;
200            }
201
202            let mut freq: HashMap<usize, usize> = HashMap::new();
203            for &nbr in &neighbors {
204                *freq.entry(labels[nbr.index()]).or_insert(0) += 1;
205            }
206
207            // neighbors is non-empty, so freq is non-empty — max and min are safe.
208            let max_count = *freq.values().max().unwrap_or(&0);
209            // Tie-break: smallest label value among tied candidates (deterministic).
210            let best_label = freq
211                .iter()
212                .filter(|&(_, count)| *count == max_count)
213                .map(|(&label, _)| label)
214                .min()
215                .unwrap_or(labels[node_idx.index()]);
216
217            if labels[node_idx.index()] != best_label {
218                labels[node_idx.index()] = best_label;
219                changed = true;
220            }
221        }
222        if !changed {
223            break;
224        }
225    }
226
227    // Group entities by final label.
228    let mut communities: HashMap<usize, Vec<i64>> = HashMap::new();
229    for node_idx in graph.node_indices() {
230        let entity_id = graph[node_idx];
231        communities
232            .entry(labels[node_idx.index()])
233            .or_default()
234            .push(entity_id);
235    }
236
237    // Keep only communities with >= 2 entities.
238    communities.retain(|_, members| members.len() >= 2);
239
240    // Build entity name lookup for summary generation.
241    let entity_name_map: HashMap<i64, &str> =
242        entities.iter().map(|e| (e.id, e.name.as_str())).collect();
243
244    // Load stored fingerprints: fingerprint -> community_id (for unchanged partition detection).
245    let stored_fingerprints = store.community_fingerprints().await?;
246
247    // Sort community labels for stable label_index assignment (HIGH-02 fix).
248    let mut sorted_labels: Vec<usize> = communities.keys().copied().collect();
249    sorted_labels.sort_unstable();
250
251    // Prepare per-community data: compute fingerprints, classify into changed/unchanged.
252    let mut to_summarize: Vec<CommunityData> = Vec::new();
253    let mut unchanged_count = 0usize;
254    let mut new_fingerprints: std::collections::HashSet<String> = std::collections::HashSet::new();
255
256    for (label_index, &label) in sorted_labels.iter().enumerate() {
257        let entity_ids = communities[&label].as_slice();
258        let member_set: std::collections::HashSet<i64> = entity_ids.iter().copied().collect();
259
260        let mut intra_facts: Vec<String> = Vec::new();
261        let mut intra_edge_ids: Vec<i64> = Vec::new();
262        for (&(src, tgt), facts) in &edge_facts_map {
263            if member_set.contains(&src) && member_set.contains(&tgt) {
264                intra_facts.extend(facts.iter().map(|f| scrub_content(f)));
265                if let Some(ids) = edge_id_map.get(&(src, tgt)) {
266                    intra_edge_ids.extend_from_slice(ids);
267                }
268            }
269        }
270
271        let fingerprint = compute_partition_fingerprint(entity_ids, &intra_edge_ids);
272        new_fingerprints.insert(fingerprint.clone());
273
274        if stored_fingerprints.contains_key(&fingerprint) {
275            // Partition unchanged — no LLM call needed.
276            unchanged_count += 1;
277            continue;
278        }
279
280        let entity_names: Vec<String> = entity_ids
281            .iter()
282            .filter_map(|id| entity_name_map.get(id).map(|&s| scrub_content(s)))
283            .collect();
284
285        // Append label_index to prevent ON CONFLICT(name) collisions when two communities
286        // share the same top-3 entity names across detect_communities runs (IC-SIG-02).
287        let base_name = entity_names
288            .iter()
289            .take(3)
290            .cloned()
291            .collect::<Vec<_>>()
292            .join(", ");
293        let name = format!("{base_name} [{label_index}]");
294
295        to_summarize.push(CommunityData {
296            entity_ids: entity_ids.to_vec(),
297            entity_names,
298            intra_facts,
299            fingerprint,
300            name,
301        });
302    }
303
304    tracing::debug!(
305        total = sorted_labels.len(),
306        unchanged = unchanged_count,
307        to_summarize = to_summarize.len(),
308        "community detection: partition classification complete"
309    );
310
311    // Delete dissolved communities — those whose fingerprints no longer appear in the new
312    // partition set. Only applicable when stored fingerprints exist (not the first run).
313    for (stored_fp, community_id) in &stored_fingerprints {
314        if !new_fingerprints.contains(stored_fp.as_str()) {
315            store.delete_community_by_id(*community_id).await?;
316        }
317    }
318
319    // Spawn LLM summarization tasks concurrently, bounded by semaphore.
320    let semaphore = Arc::new(Semaphore::new(concurrency.max(1)));
321    let mut join_set: JoinSet<(String, String, Vec<i64>, String)> = JoinSet::new();
322
323    for data in to_summarize {
324        let provider = provider.clone();
325        let sem = Arc::clone(&semaphore);
326        let max_bytes = community_summary_max_prompt_bytes;
327        join_set.spawn(async move {
328            let _permit = sem.acquire().await.expect("semaphore is never closed");
329            let summary = match generate_community_summary(
330                &provider,
331                &data.entity_names,
332                &data.intra_facts,
333                max_bytes,
334            )
335            .await
336            {
337                Ok(text) => text,
338                Err(e) => {
339                    tracing::warn!(
340                        community = %data.name,
341                        "community summary generation failed: {e:#}"
342                    );
343                    String::new()
344                }
345            };
346            (data.name, summary, data.entity_ids, data.fingerprint)
347        });
348    }
349
350    // Collect results — handle task panics explicitly (HIGH-01 fix).
351    let mut results: Vec<(String, String, Vec<i64>, String)> = Vec::new();
352    while let Some(outcome) = join_set.join_next().await {
353        match outcome {
354            Ok(tuple) => results.push(tuple),
355            Err(e) => {
356                tracing::error!(
357                    panicked = e.is_panic(),
358                    cancelled = e.is_cancelled(),
359                    "community summary task failed"
360                );
361            }
362        }
363    }
364
365    // Sort results for deterministic upsert order.
366    results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
367
368    // Upsert sequentially (SQLite is single-writer).
369    let mut count = unchanged_count;
370    for (name, summary, entity_ids, fingerprint) in results {
371        store
372            .upsert_community(&name, &summary, &entity_ids, Some(&fingerprint))
373            .await?;
374        count += 1;
375    }
376
377    Ok(count)
378}
379
380/// Assign a single entity to an existing community via neighbor majority vote.
381///
382/// Returns `Some(community_id)` if assigned, `None` if no neighbors have communities.
383///
384/// When an entity is added, the stored fingerprint is cleared (`NULL`) so the next
385/// `detect_communities` run will re-summarize the affected community (CRIT-02 fix).
386///
387/// # Errors
388///
389/// Returns an error if `SQLite` queries fail.
390pub async fn assign_to_community(
391    store: &GraphStore,
392    entity_id: i64,
393) -> Result<Option<i64>, MemoryError> {
394    let edges = store.edges_for_entity(entity_id).await?;
395    if edges.is_empty() {
396        return Ok(None);
397    }
398
399    let neighbor_ids: Vec<i64> = edges
400        .iter()
401        .map(|e| {
402            if e.source_entity_id == entity_id {
403                e.target_entity_id
404            } else {
405                e.source_entity_id
406            }
407        })
408        .collect();
409
410    let mut community_votes: HashMap<i64, usize> = HashMap::new();
411    for &nbr_id in &neighbor_ids {
412        if let Some(community) = store.community_for_entity(nbr_id).await? {
413            *community_votes.entry(community.id).or_insert(0) += 1;
414        }
415    }
416
417    if community_votes.is_empty() {
418        return Ok(None);
419    }
420
421    // Majority vote — tie-break by smallest community_id.
422    // community_votes is non-empty (checked above), so max_by always returns Some.
423    let Some((&best_community_id, _)) =
424        community_votes
425            .iter()
426            .max_by(|&(&id_a, &count_a), &(&id_b, &count_b)| {
427                count_a.cmp(&count_b).then(id_b.cmp(&id_a))
428            })
429    else {
430        return Ok(None);
431    };
432
433    if let Some(mut target) = store.find_community_by_id(best_community_id).await? {
434        if !target.entity_ids.contains(&entity_id) {
435            target.entity_ids.push(entity_id);
436            store
437                .upsert_community(&target.name, &target.summary, &target.entity_ids, None)
438                .await?;
439            // Clear fingerprint to invalidate cache — next detect_communities will re-summarize.
440            store.clear_community_fingerprint(best_community_id).await?;
441        }
442        return Ok(Some(best_community_id));
443    }
444
445    Ok(None)
446}
447
448/// Remove `Qdrant` points for entities that no longer exist in `SQLite`.
449///
450/// Returns the number of stale points deleted.
451///
452/// # Errors
453///
454/// Returns an error if `Qdrant` operations fail.
455pub async fn cleanup_stale_entity_embeddings(
456    _store: &GraphStore,
457    _embeddings: &crate::embedding_store::EmbeddingStore,
458) -> Result<usize, MemoryError> {
459    // TODO: implement when EmbeddingStore exposes a scroll_all API
460    // (follow-up: add pub async fn scroll_all(&self, collection, key_field) delegating to
461    // self.ops.scroll_all). Then enumerate Qdrant points, collect IDs where entity_id is
462    // not in SQLite, and delete stale points.
463    Ok(0)
464}
465
466/// Run graph eviction: clean expired edges, orphan entities, and cap entity count.
467///
468/// # Errors
469///
470/// Returns an error if `SQLite` queries fail.
471pub async fn run_graph_eviction(
472    store: &GraphStore,
473    expired_edge_retention_days: u32,
474    max_entities: usize,
475) -> Result<GraphEvictionStats, MemoryError> {
476    let expired_edges_deleted = store
477        .delete_expired_edges(expired_edge_retention_days)
478        .await?;
479    let orphan_entities_deleted = store
480        .delete_orphan_entities(expired_edge_retention_days)
481        .await?;
482    let capped_entities_deleted = if max_entities > 0 {
483        store.cap_entities(max_entities).await?
484    } else {
485        0
486    };
487
488    Ok(GraphEvictionStats {
489        expired_edges_deleted,
490        orphan_entities_deleted,
491        capped_entities_deleted,
492    })
493}
494
495async fn generate_community_summary(
496    provider: &AnyProvider,
497    entity_names: &[String],
498    edge_facts: &[String],
499    max_prompt_bytes: usize,
500) -> Result<String, MemoryError> {
501    let entities_str = entity_names.join(", ");
502    // Cap facts at 20 to bound prompt size; data is already scrubbed upstream.
503    let facts_str = edge_facts
504        .iter()
505        .take(20)
506        .map(|f| format!("- {f}"))
507        .collect::<Vec<_>>()
508        .join("\n");
509
510    let raw_prompt = format!(
511        "Summarize the following group of related entities and their relationships \
512         into a single paragraph (2-3 sentences). Focus on the theme that connects \
513         them and the key relationships.\n\nEntities: {entities_str}\n\
514         Relationships:\n{facts_str}\n\nSummary:"
515    );
516
517    let original_bytes = raw_prompt.len();
518    let truncated = raw_prompt.len() > max_prompt_bytes;
519    let prompt = truncate_prompt(raw_prompt, max_prompt_bytes);
520    if prompt.is_empty() {
521        return Ok(String::new());
522    }
523    if truncated {
524        tracing::warn!(
525            entity_count = entity_names.len(),
526            original_bytes,
527            truncated_bytes = prompt.len(),
528            "community summary prompt truncated"
529        );
530    }
531
532    let messages = [Message::from_legacy(Role::User, prompt)];
533    let response: String = provider.chat(&messages).await.map_err(MemoryError::Llm)?;
534    Ok(response)
535}
536
537#[cfg(test)]
538mod tests {
539    use std::sync::{Arc, Mutex};
540
541    use super::*;
542    use crate::graph::types::EntityType;
543    use crate::sqlite::SqliteStore;
544
545    async fn setup() -> GraphStore {
546        let store = SqliteStore::new(":memory:").await.unwrap();
547        GraphStore::new(store.pool().clone())
548    }
549
550    fn mock_provider() -> AnyProvider {
551        AnyProvider::Mock(zeph_llm::mock::MockProvider::default())
552    }
553
554    fn recording_provider() -> (
555        AnyProvider,
556        Arc<Mutex<Vec<Vec<zeph_llm::provider::Message>>>>,
557    ) {
558        let (mock, buf) = zeph_llm::mock::MockProvider::default().with_recording();
559        (AnyProvider::Mock(mock), buf)
560    }
561
562    #[tokio::test]
563    async fn test_detect_communities_empty_graph() {
564        let store = setup().await;
565        let provider = mock_provider();
566        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
567            .await
568            .unwrap();
569        assert_eq!(count, 0);
570    }
571
572    #[tokio::test]
573    async fn test_detect_communities_single_entity() {
574        let store = setup().await;
575        let provider = mock_provider();
576        store
577            .upsert_entity("Solo", "Solo", EntityType::Concept, None)
578            .await
579            .unwrap();
580        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
581            .await
582            .unwrap();
583        assert_eq!(count, 0, "single isolated entity must not form a community");
584    }
585
586    #[tokio::test]
587    async fn test_single_entity_community_filtered() {
588        let store = setup().await;
589        let provider = mock_provider();
590
591        // Create 3 connected entities (cluster A) and 1 isolated entity.
592        let a = store
593            .upsert_entity("A", "A", EntityType::Concept, None)
594            .await
595            .unwrap();
596        let b = store
597            .upsert_entity("B", "B", EntityType::Concept, None)
598            .await
599            .unwrap();
600        let c = store
601            .upsert_entity("C", "C", EntityType::Concept, None)
602            .await
603            .unwrap();
604        let iso = store
605            .upsert_entity("Isolated", "Isolated", EntityType::Concept, None)
606            .await
607            .unwrap();
608
609        store
610            .insert_edge(a, b, "r", "A relates B", 1.0, None)
611            .await
612            .unwrap();
613        store
614            .insert_edge(b, c, "r", "B relates C", 1.0, None)
615            .await
616            .unwrap();
617
618        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
619            .await
620            .unwrap();
621        // Isolated entity has no edges — must NOT be persisted as a community.
622        assert_eq!(count, 1, "only the 3-entity cluster should be detected");
623
624        let communities = store.all_communities().await.unwrap();
625        assert_eq!(communities.len(), 1);
626        assert!(
627            !communities[0].entity_ids.contains(&iso),
628            "isolated entity must not be in any community"
629        );
630    }
631
632    #[tokio::test]
633    async fn test_label_propagation_basic() {
634        let store = setup().await;
635        let provider = mock_provider();
636
637        // Create 4 clusters of 3 entities each (12 entities total), fully isolated.
638        let mut cluster_ids: Vec<Vec<i64>> = Vec::new();
639        for cluster in 0..4_i64 {
640            let mut ids = Vec::new();
641            for node in 0..3_i64 {
642                let name = format!("c{cluster}_n{node}");
643                let id = store
644                    .upsert_entity(&name, &name, EntityType::Concept, None)
645                    .await
646                    .unwrap();
647                ids.push(id);
648            }
649            // Connect nodes within cluster (chain: 0-1-2).
650            store
651                .insert_edge(ids[0], ids[1], "r", "f", 1.0, None)
652                .await
653                .unwrap();
654            store
655                .insert_edge(ids[1], ids[2], "r", "f", 1.0, None)
656                .await
657                .unwrap();
658            cluster_ids.push(ids);
659        }
660
661        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
662            .await
663            .unwrap();
664        assert_eq!(count, 4, "expected 4 communities, one per cluster");
665
666        let communities = store.all_communities().await.unwrap();
667        assert_eq!(communities.len(), 4);
668
669        // Each cluster's entity IDs must appear in exactly one community.
670        for ids in &cluster_ids {
671            let found = communities
672                .iter()
673                .filter(|c| ids.iter().any(|id| c.entity_ids.contains(id)))
674                .count();
675            assert_eq!(
676                found, 1,
677                "all nodes of a cluster must be in the same community"
678            );
679        }
680    }
681
682    #[tokio::test]
683    async fn test_all_isolated_nodes() {
684        let store = setup().await;
685        let provider = mock_provider();
686
687        // Insert 5 entities with no edges at all.
688        for i in 0..5_i64 {
689            store
690                .upsert_entity(
691                    &format!("iso_{i}"),
692                    &format!("iso_{i}"),
693                    EntityType::Concept,
694                    None,
695                )
696                .await
697                .unwrap();
698        }
699
700        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
701            .await
702            .unwrap();
703        assert_eq!(count, 0, "zero-edge graph must produce no communities");
704        assert_eq!(store.community_count().await.unwrap(), 0);
705    }
706
707    #[tokio::test]
708    async fn test_eviction_expired_edges() {
709        let store = setup().await;
710
711        let a = store
712            .upsert_entity("EA", "EA", EntityType::Concept, None)
713            .await
714            .unwrap();
715        let b = store
716            .upsert_entity("EB", "EB", EntityType::Concept, None)
717            .await
718            .unwrap();
719        let edge_id = store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
720        store.invalidate_edge(edge_id).await.unwrap();
721
722        // Manually set expired_at to a date far in the past to trigger deletion.
723        sqlx::query(
724            "UPDATE graph_edges SET expired_at = datetime('now', '-200 days') WHERE id = ?1",
725        )
726        .bind(edge_id)
727        .execute(store.pool())
728        .await
729        .unwrap();
730
731        let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
732        assert_eq!(stats.expired_edges_deleted, 1);
733    }
734
735    #[tokio::test]
736    async fn test_eviction_orphan_entities() {
737        let store = setup().await;
738
739        let iso = store
740            .upsert_entity("Orphan", "Orphan", EntityType::Concept, None)
741            .await
742            .unwrap();
743
744        // Set last_seen_at to far in the past.
745        sqlx::query(
746            "UPDATE graph_entities SET last_seen_at = datetime('now', '-200 days') WHERE id = ?1",
747        )
748        .bind(iso)
749        .execute(store.pool())
750        .await
751        .unwrap();
752
753        let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
754        assert_eq!(stats.orphan_entities_deleted, 1);
755    }
756
757    #[tokio::test]
758    async fn test_eviction_entity_cap() {
759        let store = setup().await;
760
761        // Insert 5 entities with no edges (so they can be capped).
762        for i in 0..5_i64 {
763            let name = format!("cap_entity_{i}");
764            store
765                .upsert_entity(&name, &name, EntityType::Concept, None)
766                .await
767                .unwrap();
768        }
769
770        let stats = run_graph_eviction(&store, 90, 3).await.unwrap();
771        assert_eq!(
772            stats.capped_entities_deleted, 2,
773            "should delete 5-3=2 entities"
774        );
775        assert_eq!(store.entity_count().await.unwrap(), 3);
776    }
777
778    #[tokio::test]
779    async fn test_assign_to_community_no_neighbors() {
780        let store = setup().await;
781        let entity_id = store
782            .upsert_entity("Loner", "Loner", EntityType::Concept, None)
783            .await
784            .unwrap();
785
786        let result = assign_to_community(&store, entity_id).await.unwrap();
787        assert!(result.is_none());
788    }
789
790    #[tokio::test]
791    async fn test_extraction_count_persistence() {
792        use tempfile::NamedTempFile;
793        // Create a real on-disk SQLite DB to verify persistence across store instances.
794        let tmp = NamedTempFile::new().unwrap();
795        let path = tmp.path().to_str().unwrap().to_owned();
796
797        let store1 = {
798            let s = crate::sqlite::SqliteStore::new(&path).await.unwrap();
799            GraphStore::new(s.pool().clone())
800        };
801
802        store1.set_metadata("extraction_count", "0").await.unwrap();
803        for i in 1..=5_i64 {
804            store1
805                .set_metadata("extraction_count", &i.to_string())
806                .await
807                .unwrap();
808        }
809
810        // Open a second handle to the same file and verify the value persists.
811        let store2 = {
812            let s = crate::sqlite::SqliteStore::new(&path).await.unwrap();
813            GraphStore::new(s.pool().clone())
814        };
815        assert_eq!(store2.extraction_count().await.unwrap(), 5);
816    }
817
818    #[test]
819    fn test_scrub_content_ascii_control() {
820        // Newline, carriage return, null byte, tab (all ASCII control chars) must be stripped.
821        let input = "hello\nworld\r\x00\x01\x09end";
822        assert_eq!(scrub_content(input), "helloworldend");
823    }
824
825    #[test]
826    fn test_scrub_content_bidi_overrides() {
827        // U+202A LEFT-TO-RIGHT EMBEDDING, U+202E RIGHT-TO-LEFT OVERRIDE,
828        // U+2066 LEFT-TO-RIGHT ISOLATE, U+2069 POP DIRECTIONAL ISOLATE.
829        let input = "safe\u{202A}inject\u{202E}end\u{2066}iso\u{2069}done".to_string();
830        assert_eq!(scrub_content(&input), "safeinjectendisodone");
831    }
832
833    #[test]
834    fn test_scrub_content_zero_width() {
835        // U+200B ZERO WIDTH SPACE, U+200C ZERO WIDTH NON-JOINER, U+200D ZERO WIDTH JOINER,
836        // U+200F RIGHT-TO-LEFT MARK.
837        let input = "a\u{200B}b\u{200C}c\u{200D}d\u{200F}e".to_string();
838        assert_eq!(scrub_content(&input), "abcde");
839    }
840
841    #[test]
842    fn test_scrub_content_bom() {
843        // U+FEFF BYTE ORDER MARK must be stripped.
844        let input = "\u{FEFF}hello".to_string();
845        assert_eq!(scrub_content(&input), "hello");
846    }
847
848    #[test]
849    fn test_scrub_content_clean_string_unchanged() {
850        let input = "Hello, World! 123 — normal text.";
851        assert_eq!(scrub_content(input), input);
852    }
853
854    #[test]
855    fn test_truncate_prompt_within_limit() {
856        let result = truncate_prompt("short".into(), 100);
857        assert_eq!(result, "short");
858    }
859
860    #[test]
861    fn test_truncate_prompt_zero_max_bytes() {
862        let result = truncate_prompt("hello".into(), 0);
863        assert_eq!(result, "");
864    }
865
866    #[test]
867    fn test_truncate_prompt_long_facts() {
868        let facts: Vec<String> = (0..20)
869            .map(|i| format!("fact_{i}_{}", "x".repeat(20)))
870            .collect();
871        let prompt = facts.join("\n");
872        let result = truncate_prompt(prompt, 200);
873        assert!(
874            result.ends_with("..."),
875            "truncated prompt must end with '...'"
876        );
877        // byte length must be at most max_bytes + 3 (the "..." suffix)
878        assert!(result.len() <= 203);
879        assert!(std::str::from_utf8(result.as_bytes()).is_ok());
880    }
881
882    #[test]
883    fn test_truncate_prompt_utf8_boundary() {
884        // Each '🔥' is 4 bytes; 100 emojis = 400 bytes.
885        let prompt = "🔥".repeat(100);
886        let result = truncate_prompt(prompt, 10);
887        assert!(
888            result.ends_with("..."),
889            "truncated prompt must end with '...'"
890        );
891        // floor_char_boundary(10) for 4-byte chars lands at 8 (2 full emojis = 8 bytes)
892        assert_eq!(result.len(), 8 + 3, "2 emojis (8 bytes) + '...' (3 bytes)");
893        assert!(std::str::from_utf8(result.as_bytes()).is_ok());
894    }
895
896    #[tokio::test]
897    async fn test_assign_to_community_majority_vote() {
898        let store = setup().await;
899
900        // Setup: community C1 with members [A, B], then add D with edges to both A and B.
901        let a = store
902            .upsert_entity("AA", "AA", EntityType::Concept, None)
903            .await
904            .unwrap();
905        let b = store
906            .upsert_entity("BB", "BB", EntityType::Concept, None)
907            .await
908            .unwrap();
909        let d = store
910            .upsert_entity("DD", "DD", EntityType::Concept, None)
911            .await
912            .unwrap();
913
914        store
915            .upsert_community("test_community", "summary", &[a, b], None)
916            .await
917            .unwrap();
918
919        store.insert_edge(d, a, "r", "f", 1.0, None).await.unwrap();
920        store.insert_edge(d, b, "r", "f", 1.0, None).await.unwrap();
921
922        let result = assign_to_community(&store, d).await.unwrap();
923        assert!(result.is_some());
924
925        // The returned ID must be valid for subsequent lookups (HIGH-IC-01 regression test).
926        let returned_id = result.unwrap();
927        let community = store
928            .find_community_by_id(returned_id)
929            .await
930            .unwrap()
931            .expect("returned community_id must reference an existing row");
932        assert!(
933            community.entity_ids.contains(&d),
934            "D should be added to the community"
935        );
936        // Fingerprint must be NULL after assign (cache invalidated for next detect run).
937        assert!(
938            community.fingerprint.is_none(),
939            "fingerprint must be cleared after assign_to_community"
940        );
941    }
942
943    /// #1262: Second `detect_communities` call with no graph changes must produce 0 LLM calls.
944    #[tokio::test]
945    async fn test_incremental_detection_no_changes_skips_llm() {
946        let store = setup().await;
947        let (provider, call_buf) = recording_provider();
948
949        let a = store
950            .upsert_entity("X", "X", EntityType::Concept, None)
951            .await
952            .unwrap();
953        let b = store
954            .upsert_entity("Y", "Y", EntityType::Concept, None)
955            .await
956            .unwrap();
957        store
958            .insert_edge(a, b, "r", "X relates Y", 1.0, None)
959            .await
960            .unwrap();
961
962        // First run: LLM called once to summarize the community.
963        detect_communities(&store, &provider, usize::MAX, 4, 0)
964            .await
965            .unwrap();
966        let first_calls = call_buf.lock().unwrap().len();
967        assert_eq!(first_calls, 1, "first run must produce exactly 1 LLM call");
968
969        // Second run: graph unchanged — 0 LLM calls.
970        detect_communities(&store, &provider, usize::MAX, 4, 0)
971            .await
972            .unwrap();
973        let second_calls = call_buf.lock().unwrap().len();
974        assert_eq!(
975            second_calls, first_calls,
976            "second run with no graph changes must produce 0 additional LLM calls"
977        );
978    }
979
980    /// #1262: Adding an edge changes the fingerprint — LLM must be called again.
981    #[tokio::test]
982    async fn test_incremental_detection_edge_change_triggers_resummary() {
983        let store = setup().await;
984        let (provider, call_buf) = recording_provider();
985
986        let a = store
987            .upsert_entity("P", "P", EntityType::Concept, None)
988            .await
989            .unwrap();
990        let b = store
991            .upsert_entity("Q", "Q", EntityType::Concept, None)
992            .await
993            .unwrap();
994        store
995            .insert_edge(a, b, "r", "P relates Q", 1.0, None)
996            .await
997            .unwrap();
998
999        detect_communities(&store, &provider, usize::MAX, 4, 0)
1000            .await
1001            .unwrap();
1002        let after_first = call_buf.lock().unwrap().len();
1003        assert_eq!(after_first, 1);
1004
1005        // Add a new edge within the community to change its fingerprint.
1006        store
1007            .insert_edge(b, a, "r2", "Q also relates P", 1.0, None)
1008            .await
1009            .unwrap();
1010
1011        detect_communities(&store, &provider, usize::MAX, 4, 0)
1012            .await
1013            .unwrap();
1014        let after_second = call_buf.lock().unwrap().len();
1015        assert_eq!(
1016            after_second, 2,
1017            "edge change must trigger one additional LLM call"
1018        );
1019    }
1020
1021    /// #1262: Communities whose fingerprints vanish are deleted on refresh.
1022    #[tokio::test]
1023    async fn test_incremental_detection_dissolved_community_deleted() {
1024        let store = setup().await;
1025        let provider = mock_provider();
1026
1027        let a = store
1028            .upsert_entity("M1", "M1", EntityType::Concept, None)
1029            .await
1030            .unwrap();
1031        let b = store
1032            .upsert_entity("M2", "M2", EntityType::Concept, None)
1033            .await
1034            .unwrap();
1035        let edge_id = store
1036            .insert_edge(a, b, "r", "M1 relates M2", 1.0, None)
1037            .await
1038            .unwrap();
1039
1040        detect_communities(&store, &provider, usize::MAX, 4, 0)
1041            .await
1042            .unwrap();
1043        assert_eq!(store.community_count().await.unwrap(), 1);
1044
1045        // Invalidate the edge — community dissolves.
1046        store.invalidate_edge(edge_id).await.unwrap();
1047
1048        detect_communities(&store, &provider, usize::MAX, 4, 0)
1049            .await
1050            .unwrap();
1051        assert_eq!(
1052            store.community_count().await.unwrap(),
1053            0,
1054            "dissolved community must be deleted on next refresh"
1055        );
1056    }
1057
1058    /// #1260: Sequential fallback (concurrency=1) produces correct results.
1059    #[tokio::test]
1060    async fn test_detect_communities_concurrency_one() {
1061        let store = setup().await;
1062        let provider = mock_provider();
1063
1064        let a = store
1065            .upsert_entity("C1A", "C1A", EntityType::Concept, None)
1066            .await
1067            .unwrap();
1068        let b = store
1069            .upsert_entity("C1B", "C1B", EntityType::Concept, None)
1070            .await
1071            .unwrap();
1072        store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
1073
1074        let count = detect_communities(&store, &provider, usize::MAX, 1, 0)
1075            .await
1076            .unwrap();
1077        assert_eq!(count, 1, "concurrency=1 must still detect the community");
1078        assert_eq!(store.community_count().await.unwrap(), 1);
1079    }
1080
1081    #[test]
1082    fn test_compute_fingerprint_deterministic() {
1083        let fp1 = compute_partition_fingerprint(&[1, 2, 3], &[10, 20]);
1084        let fp2 = compute_partition_fingerprint(&[3, 1, 2], &[20, 10]);
1085        assert_eq!(fp1, fp2, "fingerprint must be order-independent");
1086
1087        let fp3 = compute_partition_fingerprint(&[1, 2, 3], &[10, 30]);
1088        assert_ne!(
1089            fp1, fp3,
1090            "different edge IDs must produce different fingerprint"
1091        );
1092
1093        let fp4 = compute_partition_fingerprint(&[1, 2, 4], &[10, 20]);
1094        assert_ne!(
1095            fp1, fp4,
1096            "different entity IDs must produce different fingerprint"
1097        );
1098    }
1099
1100    /// Domain separator test: entity/edge sequences with same raw bytes must not collide.
1101    ///
1102    /// Without domain separators, entities=[1,2] edges=[3] would hash identically to
1103    /// entities=[1] edges=[2,3] (same concatenated `le_bytes`). With separators they differ.
1104    #[test]
1105    fn test_compute_fingerprint_domain_separation() {
1106        let fp_a = compute_partition_fingerprint(&[1, 2], &[3]);
1107        let fp_b = compute_partition_fingerprint(&[1], &[2, 3]);
1108        assert_ne!(
1109            fp_a, fp_b,
1110            "entity/edge sequences with same raw bytes must produce different fingerprints"
1111        );
1112    }
1113
1114    /// Chunked loading with chunk_size=1 must produce correct community assignments.
1115    ///
1116    /// Verifies: (a) community count is correct, (b) edge_facts_map and edge_id_map are fully
1117    /// populated (checked via community membership — all edges contribute to fingerprints),
1118    /// (c) the loop executes multiple iterations by using a tiny chunk size on a 3-edge graph.
1119    #[tokio::test]
1120    async fn test_detect_communities_chunked_correct_membership() {
1121        let store = setup().await;
1122        let provider = mock_provider();
1123
1124        // Build two isolated clusters: A-B-C and D-E.
1125        let a = store
1126            .upsert_entity("CA", "CA", EntityType::Concept, None)
1127            .await
1128            .unwrap();
1129        let b = store
1130            .upsert_entity("CB", "CB", EntityType::Concept, None)
1131            .await
1132            .unwrap();
1133        let c = store
1134            .upsert_entity("CC", "CC", EntityType::Concept, None)
1135            .await
1136            .unwrap();
1137        let d = store
1138            .upsert_entity("CD", "CD", EntityType::Concept, None)
1139            .await
1140            .unwrap();
1141        let e = store
1142            .upsert_entity("CE", "CE", EntityType::Concept, None)
1143            .await
1144            .unwrap();
1145
1146        store
1147            .insert_edge(a, b, "r", "A-B fact", 1.0, None)
1148            .await
1149            .unwrap();
1150        store
1151            .insert_edge(b, c, "r", "B-C fact", 1.0, None)
1152            .await
1153            .unwrap();
1154        store
1155            .insert_edge(d, e, "r", "D-E fact", 1.0, None)
1156            .await
1157            .unwrap();
1158
1159        // chunk_size=1: each edge is fetched individually — loop must execute 3 times.
1160        let count_chunked = detect_communities(&store, &provider, usize::MAX, 4, 1)
1161            .await
1162            .unwrap();
1163        assert_eq!(
1164            count_chunked, 2,
1165            "chunked loading must detect both communities"
1166        );
1167
1168        // Verify communities contain the correct members.
1169        let communities = store.all_communities().await.unwrap();
1170        assert_eq!(communities.len(), 2);
1171
1172        let abc_ids = [a, b, c];
1173        let de_ids = [d, e];
1174        let has_abc = communities
1175            .iter()
1176            .any(|c| abc_ids.iter().all(|id| c.entity_ids.contains(id)));
1177        let has_de = communities
1178            .iter()
1179            .any(|c| de_ids.iter().all(|id| c.entity_ids.contains(id)));
1180        assert!(has_abc, "cluster A-B-C must form a community");
1181        assert!(has_de, "cluster D-E must form a community");
1182    }
1183
1184    /// chunk_size=usize::MAX must load all edges in a single query and produce correct results.
1185    #[tokio::test]
1186    async fn test_detect_communities_chunk_size_max() {
1187        let store = setup().await;
1188        let provider = mock_provider();
1189
1190        let x = store
1191            .upsert_entity("MX", "MX", EntityType::Concept, None)
1192            .await
1193            .unwrap();
1194        let y = store
1195            .upsert_entity("MY", "MY", EntityType::Concept, None)
1196            .await
1197            .unwrap();
1198        store
1199            .insert_edge(x, y, "r", "X-Y fact", 1.0, None)
1200            .await
1201            .unwrap();
1202
1203        let count = detect_communities(&store, &provider, usize::MAX, 4, usize::MAX)
1204            .await
1205            .unwrap();
1206        assert_eq!(count, 1, "chunk_size=usize::MAX must detect the community");
1207    }
1208
1209    /// chunk_size=0 falls back to the stream path without panicking.
1210    #[tokio::test]
1211    async fn test_detect_communities_chunk_size_zero_fallback() {
1212        let store = setup().await;
1213        let provider = mock_provider();
1214
1215        let p = store
1216            .upsert_entity("ZP", "ZP", EntityType::Concept, None)
1217            .await
1218            .unwrap();
1219        let q = store
1220            .upsert_entity("ZQ", "ZQ", EntityType::Concept, None)
1221            .await
1222            .unwrap();
1223        store
1224            .insert_edge(p, q, "r", "P-Q fact", 1.0, None)
1225            .await
1226            .unwrap();
1227
1228        let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
1229            .await
1230            .unwrap();
1231        assert_eq!(
1232            count, 1,
1233            "chunk_size=0 must detect the community via stream fallback"
1234        );
1235    }
1236
1237    /// Verifies that edge_facts_map is fully populated during chunked loading by checking
1238    /// that the community fingerprint changes when a new edge is added (fingerprint includes
1239    /// edge IDs, so any missed edges would produce a different or stale fingerprint).
1240    #[tokio::test]
1241    async fn test_detect_communities_chunked_edge_map_complete() {
1242        let store = setup().await;
1243        let (provider, call_buf) = recording_provider();
1244
1245        let a = store
1246            .upsert_entity("FA", "FA", EntityType::Concept, None)
1247            .await
1248            .unwrap();
1249        let b = store
1250            .upsert_entity("FB", "FB", EntityType::Concept, None)
1251            .await
1252            .unwrap();
1253        store
1254            .insert_edge(a, b, "r", "edge1 fact", 1.0, None)
1255            .await
1256            .unwrap();
1257
1258        // First detection with chunk_size=1.
1259        detect_communities(&store, &provider, usize::MAX, 4, 1)
1260            .await
1261            .unwrap();
1262        let calls_after_first = call_buf.lock().unwrap().len();
1263        assert_eq!(calls_after_first, 1, "first run must trigger 1 LLM call");
1264
1265        // Add another edge — fingerprint must change, triggering a second LLM call.
1266        store
1267            .insert_edge(b, a, "r2", "edge2 fact", 1.0, None)
1268            .await
1269            .unwrap();
1270
1271        detect_communities(&store, &provider, usize::MAX, 4, 1)
1272            .await
1273            .unwrap();
1274        let calls_after_second = call_buf.lock().unwrap().len();
1275        assert_eq!(
1276            calls_after_second, 2,
1277            "adding an edge must change fingerprint and trigger re-summarization"
1278        );
1279    }
1280}