Skip to main content

zeph_memory/graph/
community.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use futures::TryStreamExt as _;
8use petgraph::Graph;
9use petgraph::graph::NodeIndex;
10use tokio::sync::Semaphore;
11use tokio::task::JoinSet;
12use zeph_llm::LlmProvider as _;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{Message, Role};
15
16use crate::error::MemoryError;
17
18use super::store::GraphStore;
19
20const MAX_LABEL_PROPAGATION_ITERATIONS: usize = 50;
21
22/// Strip control characters, Unicode bidi overrides, and zero-width characters from `s`
23/// to prevent prompt injection via entity names or edge facts sourced from untrusted text.
24///
25/// Filtered categories:
26/// - All Unicode control characters (`Cc` category, covers ASCII controls and more)
27/// - Bidi control characters: U+202A–U+202E, U+2066–U+2069
28/// - Zero-width and invisible characters: U+200B–U+200F (includes U+200C, U+200D)
29/// - Byte-order mark: U+FEFF
30fn scrub_content(s: &str) -> String {
31    s.chars()
32        .filter(|c| {
33            !c.is_control()
34                && !matches!(*c as u32,
35                    0x200B..=0x200F | 0x202A..=0x202E | 0x2066..=0x2069 | 0xFEFF
36                )
37        })
38        .collect()
39}
40
41/// Stats returned from graph eviction.
42#[derive(Debug, Default)]
43pub struct GraphEvictionStats {
44    pub expired_edges_deleted: usize,
45    pub orphan_entities_deleted: usize,
46    pub capped_entities_deleted: usize,
47}
48
49/// Truncate `prompt` to at most `max_bytes` at a UTF-8 boundary, appending `"..."`
50/// if truncation occurred.
51///
52/// If `max_bytes` is 0, returns an empty string immediately (disables community summaries).
53/// Otherwise clamps the boundary to the nearest valid UTF-8 char boundary and appends `"..."`.
54fn truncate_prompt(prompt: String, max_bytes: usize) -> String {
55    if max_bytes == 0 {
56        return String::new();
57    }
58    if prompt.len() <= max_bytes {
59        return prompt;
60    }
61    let boundary = prompt.floor_char_boundary(max_bytes);
62    format!("{}...", &prompt[..boundary])
63}
64
65/// Compute a BLAKE3 fingerprint for a community partition.
66///
67/// The fingerprint is derived from sorted entity IDs and sorted intra-community edge IDs,
68/// ensuring both membership and edge mutations trigger re-summarization.
69/// BLAKE3 is used (not `DefaultHasher`) to guarantee determinism across process restarts.
70fn compute_partition_fingerprint(entity_ids: &[i64], intra_edge_ids: &[i64]) -> String {
71    let mut hasher = blake3::Hasher::new();
72    let mut sorted_entities = entity_ids.to_vec();
73    sorted_entities.sort_unstable();
74    hasher.update(b"entities");
75    for id in &sorted_entities {
76        hasher.update(&id.to_le_bytes());
77    }
78    let mut sorted_edges = intra_edge_ids.to_vec();
79    sorted_edges.sort_unstable();
80    hasher.update(b"edges");
81    for id in &sorted_edges {
82        hasher.update(&id.to_le_bytes());
83    }
84    hasher.finalize().to_hex().to_string()
85}
86
87/// Per-community data collected before spawning LLM summarization tasks.
88struct CommunityData {
89    entity_ids: Vec<i64>,
90    entity_names: Vec<String>,
91    intra_facts: Vec<String>,
92    fingerprint: String,
93    name: String,
94}
95
96/// Run label propagation on the full entity graph, generate community summaries via LLM,
97/// and upsert results to `SQLite`.
98///
99/// Returns the number of communities detected (with `>= 2` entities).
100///
101/// Unchanged communities (same entity membership and intra-community edges) are skipped —
102/// their existing summaries are preserved without LLM calls (incremental detection, #1262).
103/// LLM calls for changed communities are parallelized via a `JoinSet` bounded by a
104/// semaphore with `concurrency` permits (#1260).
105///
106/// # Panics
107///
108/// Does not panic in normal operation. The `semaphore.acquire().await.expect(...)` call is
109/// infallible because the semaphore is never closed during the lifetime of this function.
110///
111/// # Errors
112///
113/// Returns an error if `SQLite` queries or LLM calls fail.
114#[allow(clippy::too_many_lines)]
115pub async fn detect_communities(
116    store: &GraphStore,
117    provider: &AnyProvider,
118    community_summary_max_prompt_bytes: usize,
119    concurrency: usize,
120) -> Result<usize, MemoryError> {
121    let entities = store.all_entities().await?;
122    if entities.len() < 2 {
123        return Ok(0);
124    }
125
126    // Build undirected graph: node weight = entity_id, no edge weight.
127    // Tie-breaking in label propagation is deterministic for a given dataset
128    // (labels are NodeIndex values assigned in ORDER BY id ASC order), but may
129    // vary if entity IDs change after deletion/re-insertion.
130    let mut graph = Graph::<i64, (), petgraph::Undirected>::new_undirected();
131    let mut node_map: HashMap<i64, NodeIndex> = HashMap::new();
132
133    for entity in &entities {
134        let idx = graph.add_node(entity.id);
135        node_map.insert(entity.id, idx);
136    }
137
138    let edges: Vec<_> = store.all_active_edges_stream().try_collect().await?;
139    for edge in &edges {
140        if let (Some(&src_idx), Some(&tgt_idx)) = (
141            node_map.get(&edge.source_entity_id),
142            node_map.get(&edge.target_entity_id),
143        ) {
144            graph.add_edge(src_idx, tgt_idx, ());
145        }
146    }
147
148    // Label propagation: each node starts with its own NodeIndex as label.
149    let mut labels: Vec<usize> = (0..graph.node_count()).collect();
150
151    for _ in 0..MAX_LABEL_PROPAGATION_ITERATIONS {
152        let mut changed = false;
153        for node_idx in graph.node_indices() {
154            let neighbors: Vec<NodeIndex> = graph.neighbors(node_idx).collect();
155            if neighbors.is_empty() {
156                continue;
157            }
158
159            let mut freq: HashMap<usize, usize> = HashMap::new();
160            for &nbr in &neighbors {
161                *freq.entry(labels[nbr.index()]).or_insert(0) += 1;
162            }
163
164            // neighbors is non-empty, so freq is non-empty — max and min are safe.
165            let max_count = *freq.values().max().unwrap_or(&0);
166            // Tie-break: smallest label value among tied candidates (deterministic).
167            let best_label = freq
168                .iter()
169                .filter(|&(_, count)| *count == max_count)
170                .map(|(&label, _)| label)
171                .min()
172                .unwrap_or(labels[node_idx.index()]);
173
174            if labels[node_idx.index()] != best_label {
175                labels[node_idx.index()] = best_label;
176                changed = true;
177            }
178        }
179        if !changed {
180            break;
181        }
182    }
183
184    // Group entities by final label.
185    let mut communities: HashMap<usize, Vec<i64>> = HashMap::new();
186    for node_idx in graph.node_indices() {
187        let entity_id = graph[node_idx];
188        communities
189            .entry(labels[node_idx.index()])
190            .or_default()
191            .push(entity_id);
192    }
193
194    // Keep only communities with >= 2 entities.
195    communities.retain(|_, members| members.len() >= 2);
196
197    // Build entity name lookup for summary generation.
198    let entity_name_map: HashMap<i64, &str> =
199        entities.iter().map(|e| (e.id, e.name.as_str())).collect();
200
201    // Build edge fact lookup and edge ID lookup indexed by entity pair.
202    let mut edge_facts_map: HashMap<(i64, i64), Vec<String>> = HashMap::new();
203    let mut edge_id_map: HashMap<(i64, i64), Vec<i64>> = HashMap::new();
204    for edge in &edges {
205        let key = (edge.source_entity_id, edge.target_entity_id);
206        edge_facts_map
207            .entry(key)
208            .or_default()
209            .push(edge.fact.clone());
210        edge_id_map.entry(key).or_default().push(edge.id);
211    }
212
213    // Load stored fingerprints: fingerprint -> community_id (for unchanged partition detection).
214    let stored_fingerprints = store.community_fingerprints().await?;
215
216    // Sort community labels for stable label_index assignment (HIGH-02 fix).
217    let mut sorted_labels: Vec<usize> = communities.keys().copied().collect();
218    sorted_labels.sort_unstable();
219
220    // Prepare per-community data: compute fingerprints, classify into changed/unchanged.
221    let mut to_summarize: Vec<CommunityData> = Vec::new();
222    let mut unchanged_count = 0usize;
223    let mut new_fingerprints: std::collections::HashSet<String> = std::collections::HashSet::new();
224
225    for (label_index, &label) in sorted_labels.iter().enumerate() {
226        let entity_ids = communities[&label].as_slice();
227        let member_set: std::collections::HashSet<i64> = entity_ids.iter().copied().collect();
228
229        let mut intra_facts: Vec<String> = Vec::new();
230        let mut intra_edge_ids: Vec<i64> = Vec::new();
231        for (&(src, tgt), facts) in &edge_facts_map {
232            if member_set.contains(&src) && member_set.contains(&tgt) {
233                intra_facts.extend(facts.iter().map(|f| scrub_content(f)));
234                if let Some(ids) = edge_id_map.get(&(src, tgt)) {
235                    intra_edge_ids.extend_from_slice(ids);
236                }
237            }
238        }
239
240        let fingerprint = compute_partition_fingerprint(entity_ids, &intra_edge_ids);
241        new_fingerprints.insert(fingerprint.clone());
242
243        if stored_fingerprints.contains_key(&fingerprint) {
244            // Partition unchanged — no LLM call needed.
245            unchanged_count += 1;
246            continue;
247        }
248
249        let entity_names: Vec<String> = entity_ids
250            .iter()
251            .filter_map(|id| entity_name_map.get(id).map(|&s| scrub_content(s)))
252            .collect();
253
254        // Append label_index to prevent ON CONFLICT(name) collisions when two communities
255        // share the same top-3 entity names across detect_communities runs (IC-SIG-02).
256        let base_name = entity_names
257            .iter()
258            .take(3)
259            .cloned()
260            .collect::<Vec<_>>()
261            .join(", ");
262        let name = format!("{base_name} [{label_index}]");
263
264        to_summarize.push(CommunityData {
265            entity_ids: entity_ids.to_vec(),
266            entity_names,
267            intra_facts,
268            fingerprint,
269            name,
270        });
271    }
272
273    tracing::debug!(
274        total = sorted_labels.len(),
275        unchanged = unchanged_count,
276        to_summarize = to_summarize.len(),
277        "community detection: partition classification complete"
278    );
279
280    // Delete dissolved communities — those whose fingerprints no longer appear in the new
281    // partition set. Only applicable when stored fingerprints exist (not the first run).
282    for (stored_fp, community_id) in &stored_fingerprints {
283        if !new_fingerprints.contains(stored_fp.as_str()) {
284            store.delete_community_by_id(*community_id).await?;
285        }
286    }
287
288    // Spawn LLM summarization tasks concurrently, bounded by semaphore.
289    let semaphore = Arc::new(Semaphore::new(concurrency.max(1)));
290    let mut join_set: JoinSet<(String, String, Vec<i64>, String)> = JoinSet::new();
291
292    for data in to_summarize {
293        let provider = provider.clone();
294        let sem = Arc::clone(&semaphore);
295        let max_bytes = community_summary_max_prompt_bytes;
296        join_set.spawn(async move {
297            let _permit = sem.acquire().await.expect("semaphore is never closed");
298            let summary = match generate_community_summary(
299                &provider,
300                &data.entity_names,
301                &data.intra_facts,
302                max_bytes,
303            )
304            .await
305            {
306                Ok(text) => text,
307                Err(e) => {
308                    tracing::warn!(
309                        community = %data.name,
310                        "community summary generation failed: {e:#}"
311                    );
312                    String::new()
313                }
314            };
315            (data.name, summary, data.entity_ids, data.fingerprint)
316        });
317    }
318
319    // Collect results — handle task panics explicitly (HIGH-01 fix).
320    let mut results: Vec<(String, String, Vec<i64>, String)> = Vec::new();
321    while let Some(outcome) = join_set.join_next().await {
322        match outcome {
323            Ok(tuple) => results.push(tuple),
324            Err(e) => {
325                tracing::error!(
326                    panicked = e.is_panic(),
327                    cancelled = e.is_cancelled(),
328                    "community summary task failed"
329                );
330            }
331        }
332    }
333
334    // Sort results for deterministic upsert order.
335    results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
336
337    // Upsert sequentially (SQLite is single-writer).
338    let mut count = unchanged_count;
339    for (name, summary, entity_ids, fingerprint) in results {
340        store
341            .upsert_community(&name, &summary, &entity_ids, Some(&fingerprint))
342            .await?;
343        count += 1;
344    }
345
346    Ok(count)
347}
348
349/// Assign a single entity to an existing community via neighbor majority vote.
350///
351/// Returns `Some(community_id)` if assigned, `None` if no neighbors have communities.
352///
353/// When an entity is added, the stored fingerprint is cleared (`NULL`) so the next
354/// `detect_communities` run will re-summarize the affected community (CRIT-02 fix).
355///
356/// # Errors
357///
358/// Returns an error if `SQLite` queries fail.
359pub async fn assign_to_community(
360    store: &GraphStore,
361    entity_id: i64,
362) -> Result<Option<i64>, MemoryError> {
363    let edges = store.edges_for_entity(entity_id).await?;
364    if edges.is_empty() {
365        return Ok(None);
366    }
367
368    let neighbor_ids: Vec<i64> = edges
369        .iter()
370        .map(|e| {
371            if e.source_entity_id == entity_id {
372                e.target_entity_id
373            } else {
374                e.source_entity_id
375            }
376        })
377        .collect();
378
379    let mut community_votes: HashMap<i64, usize> = HashMap::new();
380    for &nbr_id in &neighbor_ids {
381        if let Some(community) = store.community_for_entity(nbr_id).await? {
382            *community_votes.entry(community.id).or_insert(0) += 1;
383        }
384    }
385
386    if community_votes.is_empty() {
387        return Ok(None);
388    }
389
390    // Majority vote — tie-break by smallest community_id.
391    // community_votes is non-empty (checked above), so max_by always returns Some.
392    let Some((&best_community_id, _)) =
393        community_votes
394            .iter()
395            .max_by(|&(&id_a, &count_a), &(&id_b, &count_b)| {
396                count_a.cmp(&count_b).then(id_b.cmp(&id_a))
397            })
398    else {
399        return Ok(None);
400    };
401
402    if let Some(mut target) = store.find_community_by_id(best_community_id).await? {
403        if !target.entity_ids.contains(&entity_id) {
404            target.entity_ids.push(entity_id);
405            store
406                .upsert_community(&target.name, &target.summary, &target.entity_ids, None)
407                .await?;
408            // Clear fingerprint to invalidate cache — next detect_communities will re-summarize.
409            store.clear_community_fingerprint(best_community_id).await?;
410        }
411        return Ok(Some(best_community_id));
412    }
413
414    Ok(None)
415}
416
417/// Remove `Qdrant` points for entities that no longer exist in `SQLite`.
418///
419/// Returns the number of stale points deleted.
420///
421/// # Errors
422///
423/// Returns an error if `Qdrant` operations fail.
424pub async fn cleanup_stale_entity_embeddings(
425    _store: &GraphStore,
426    _embeddings: &crate::embedding_store::EmbeddingStore,
427) -> Result<usize, MemoryError> {
428    // TODO: implement when EmbeddingStore exposes a scroll_all API
429    // (follow-up: add pub async fn scroll_all(&self, collection, key_field) delegating to
430    // self.ops.scroll_all). Then enumerate Qdrant points, collect IDs where entity_id is
431    // not in SQLite, and delete stale points.
432    Ok(0)
433}
434
435/// Run graph eviction: clean expired edges, orphan entities, and cap entity count.
436///
437/// # Errors
438///
439/// Returns an error if `SQLite` queries fail.
440pub async fn run_graph_eviction(
441    store: &GraphStore,
442    expired_edge_retention_days: u32,
443    max_entities: usize,
444) -> Result<GraphEvictionStats, MemoryError> {
445    let expired_edges_deleted = store
446        .delete_expired_edges(expired_edge_retention_days)
447        .await?;
448    let orphan_entities_deleted = store
449        .delete_orphan_entities(expired_edge_retention_days)
450        .await?;
451    let capped_entities_deleted = if max_entities > 0 {
452        store.cap_entities(max_entities).await?
453    } else {
454        0
455    };
456
457    Ok(GraphEvictionStats {
458        expired_edges_deleted,
459        orphan_entities_deleted,
460        capped_entities_deleted,
461    })
462}
463
464async fn generate_community_summary(
465    provider: &AnyProvider,
466    entity_names: &[String],
467    edge_facts: &[String],
468    max_prompt_bytes: usize,
469) -> Result<String, MemoryError> {
470    let entities_str = entity_names.join(", ");
471    // Cap facts at 20 to bound prompt size; data is already scrubbed upstream.
472    let facts_str = edge_facts
473        .iter()
474        .take(20)
475        .map(|f| format!("- {f}"))
476        .collect::<Vec<_>>()
477        .join("\n");
478
479    let raw_prompt = format!(
480        "Summarize the following group of related entities and their relationships \
481         into a single paragraph (2-3 sentences). Focus on the theme that connects \
482         them and the key relationships.\n\nEntities: {entities_str}\n\
483         Relationships:\n{facts_str}\n\nSummary:"
484    );
485
486    let original_bytes = raw_prompt.len();
487    let truncated = raw_prompt.len() > max_prompt_bytes;
488    let prompt = truncate_prompt(raw_prompt, max_prompt_bytes);
489    if prompt.is_empty() {
490        return Ok(String::new());
491    }
492    if truncated {
493        tracing::warn!(
494            entity_count = entity_names.len(),
495            original_bytes,
496            truncated_bytes = prompt.len(),
497            "community summary prompt truncated"
498        );
499    }
500
501    let messages = [Message::from_legacy(Role::User, prompt)];
502    let response: String = provider.chat(&messages).await.map_err(MemoryError::Llm)?;
503    Ok(response)
504}
505
506#[cfg(test)]
507mod tests {
508    use std::sync::{Arc, Mutex};
509
510    use super::*;
511    use crate::graph::types::EntityType;
512    use crate::sqlite::SqliteStore;
513
514    async fn setup() -> GraphStore {
515        let store = SqliteStore::new(":memory:").await.unwrap();
516        GraphStore::new(store.pool().clone())
517    }
518
519    fn mock_provider() -> AnyProvider {
520        AnyProvider::Mock(zeph_llm::mock::MockProvider::default())
521    }
522
523    fn recording_provider() -> (
524        AnyProvider,
525        Arc<Mutex<Vec<Vec<zeph_llm::provider::Message>>>>,
526    ) {
527        let (mock, buf) = zeph_llm::mock::MockProvider::default().with_recording();
528        (AnyProvider::Mock(mock), buf)
529    }
530
531    #[tokio::test]
532    async fn test_detect_communities_empty_graph() {
533        let store = setup().await;
534        let provider = mock_provider();
535        let count = detect_communities(&store, &provider, usize::MAX, 4)
536            .await
537            .unwrap();
538        assert_eq!(count, 0);
539    }
540
541    #[tokio::test]
542    async fn test_detect_communities_single_entity() {
543        let store = setup().await;
544        let provider = mock_provider();
545        store
546            .upsert_entity("Solo", "Solo", EntityType::Concept, None)
547            .await
548            .unwrap();
549        let count = detect_communities(&store, &provider, usize::MAX, 4)
550            .await
551            .unwrap();
552        assert_eq!(count, 0, "single isolated entity must not form a community");
553    }
554
555    #[tokio::test]
556    async fn test_single_entity_community_filtered() {
557        let store = setup().await;
558        let provider = mock_provider();
559
560        // Create 3 connected entities (cluster A) and 1 isolated entity.
561        let a = store
562            .upsert_entity("A", "A", EntityType::Concept, None)
563            .await
564            .unwrap();
565        let b = store
566            .upsert_entity("B", "B", EntityType::Concept, None)
567            .await
568            .unwrap();
569        let c = store
570            .upsert_entity("C", "C", EntityType::Concept, None)
571            .await
572            .unwrap();
573        let _iso = store
574            .upsert_entity("Isolated", "Isolated", EntityType::Concept, None)
575            .await
576            .unwrap();
577
578        store
579            .insert_edge(a, b, "r", "A relates B", 1.0, None)
580            .await
581            .unwrap();
582        store
583            .insert_edge(b, c, "r", "B relates C", 1.0, None)
584            .await
585            .unwrap();
586
587        let count = detect_communities(&store, &provider, usize::MAX, 4)
588            .await
589            .unwrap();
590        // Isolated entity has no edges — must NOT be persisted as a community.
591        assert_eq!(count, 1, "only the 3-entity cluster should be detected");
592
593        let communities = store.all_communities().await.unwrap();
594        assert_eq!(communities.len(), 1);
595        assert!(
596            !communities[0].entity_ids.contains(&_iso),
597            "isolated entity must not be in any community"
598        );
599    }
600
601    #[tokio::test]
602    async fn test_label_propagation_basic() {
603        let store = setup().await;
604        let provider = mock_provider();
605
606        // Create 4 clusters of 3 entities each (12 entities total), fully isolated.
607        let mut cluster_ids: Vec<Vec<i64>> = Vec::new();
608        for cluster in 0..4_i64 {
609            let mut ids = Vec::new();
610            for node in 0..3_i64 {
611                let name = format!("c{cluster}_n{node}");
612                let id = store
613                    .upsert_entity(&name, &name, EntityType::Concept, None)
614                    .await
615                    .unwrap();
616                ids.push(id);
617            }
618            // Connect nodes within cluster (chain: 0-1-2).
619            store
620                .insert_edge(ids[0], ids[1], "r", "f", 1.0, None)
621                .await
622                .unwrap();
623            store
624                .insert_edge(ids[1], ids[2], "r", "f", 1.0, None)
625                .await
626                .unwrap();
627            cluster_ids.push(ids);
628        }
629
630        let count = detect_communities(&store, &provider, usize::MAX, 4)
631            .await
632            .unwrap();
633        assert_eq!(count, 4, "expected 4 communities, one per cluster");
634
635        let communities = store.all_communities().await.unwrap();
636        assert_eq!(communities.len(), 4);
637
638        // Each cluster's entity IDs must appear in exactly one community.
639        for ids in &cluster_ids {
640            let found = communities
641                .iter()
642                .filter(|c| ids.iter().any(|id| c.entity_ids.contains(id)))
643                .count();
644            assert_eq!(
645                found, 1,
646                "all nodes of a cluster must be in the same community"
647            );
648        }
649    }
650
651    #[tokio::test]
652    async fn test_all_isolated_nodes() {
653        let store = setup().await;
654        let provider = mock_provider();
655
656        // Insert 5 entities with no edges at all.
657        for i in 0..5_i64 {
658            store
659                .upsert_entity(
660                    &format!("iso_{i}"),
661                    &format!("iso_{i}"),
662                    EntityType::Concept,
663                    None,
664                )
665                .await
666                .unwrap();
667        }
668
669        let count = detect_communities(&store, &provider, usize::MAX, 4)
670            .await
671            .unwrap();
672        assert_eq!(count, 0, "zero-edge graph must produce no communities");
673        assert_eq!(store.community_count().await.unwrap(), 0);
674    }
675
676    #[tokio::test]
677    async fn test_eviction_expired_edges() {
678        let store = setup().await;
679
680        let a = store
681            .upsert_entity("EA", "EA", EntityType::Concept, None)
682            .await
683            .unwrap();
684        let b = store
685            .upsert_entity("EB", "EB", EntityType::Concept, None)
686            .await
687            .unwrap();
688        let edge_id = store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
689        store.invalidate_edge(edge_id).await.unwrap();
690
691        // Manually set expired_at to a date far in the past to trigger deletion.
692        sqlx::query(
693            "UPDATE graph_edges SET expired_at = datetime('now', '-200 days') WHERE id = ?1",
694        )
695        .bind(edge_id)
696        .execute(store.pool())
697        .await
698        .unwrap();
699
700        let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
701        assert_eq!(stats.expired_edges_deleted, 1);
702    }
703
704    #[tokio::test]
705    async fn test_eviction_orphan_entities() {
706        let store = setup().await;
707
708        let iso = store
709            .upsert_entity("Orphan", "Orphan", EntityType::Concept, None)
710            .await
711            .unwrap();
712
713        // Set last_seen_at to far in the past.
714        sqlx::query(
715            "UPDATE graph_entities SET last_seen_at = datetime('now', '-200 days') WHERE id = ?1",
716        )
717        .bind(iso)
718        .execute(store.pool())
719        .await
720        .unwrap();
721
722        let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
723        assert_eq!(stats.orphan_entities_deleted, 1);
724    }
725
726    #[tokio::test]
727    async fn test_eviction_entity_cap() {
728        let store = setup().await;
729
730        // Insert 5 entities with no edges (so they can be capped).
731        for i in 0..5_i64 {
732            let name = format!("cap_entity_{i}");
733            store
734                .upsert_entity(&name, &name, EntityType::Concept, None)
735                .await
736                .unwrap();
737        }
738
739        let stats = run_graph_eviction(&store, 90, 3).await.unwrap();
740        assert_eq!(
741            stats.capped_entities_deleted, 2,
742            "should delete 5-3=2 entities"
743        );
744        assert_eq!(store.entity_count().await.unwrap(), 3);
745    }
746
747    #[tokio::test]
748    async fn test_assign_to_community_no_neighbors() {
749        let store = setup().await;
750        let entity_id = store
751            .upsert_entity("Loner", "Loner", EntityType::Concept, None)
752            .await
753            .unwrap();
754
755        let result = assign_to_community(&store, entity_id).await.unwrap();
756        assert!(result.is_none());
757    }
758
759    #[tokio::test]
760    async fn test_extraction_count_persistence() {
761        use tempfile::NamedTempFile;
762        // Create a real on-disk SQLite DB to verify persistence across store instances.
763        let tmp = NamedTempFile::new().unwrap();
764        let path = tmp.path().to_str().unwrap().to_owned();
765
766        let store1 = {
767            let s = crate::sqlite::SqliteStore::new(&path).await.unwrap();
768            GraphStore::new(s.pool().clone())
769        };
770
771        store1.set_metadata("extraction_count", "0").await.unwrap();
772        for i in 1..=5_i64 {
773            store1
774                .set_metadata("extraction_count", &i.to_string())
775                .await
776                .unwrap();
777        }
778
779        // Open a second handle to the same file and verify the value persists.
780        let store2 = {
781            let s = crate::sqlite::SqliteStore::new(&path).await.unwrap();
782            GraphStore::new(s.pool().clone())
783        };
784        assert_eq!(store2.extraction_count().await.unwrap(), 5);
785    }
786
787    #[test]
788    fn test_scrub_content_ascii_control() {
789        // Newline, carriage return, null byte, tab (all ASCII control chars) must be stripped.
790        let input = "hello\nworld\r\x00\x01\x09end";
791        assert_eq!(scrub_content(input), "helloworldend");
792    }
793
794    #[test]
795    fn test_scrub_content_bidi_overrides() {
796        // U+202A LEFT-TO-RIGHT EMBEDDING, U+202E RIGHT-TO-LEFT OVERRIDE,
797        // U+2066 LEFT-TO-RIGHT ISOLATE, U+2069 POP DIRECTIONAL ISOLATE.
798        let input = format!("safe\u{202A}inject\u{202E}end\u{2066}iso\u{2069}done");
799        assert_eq!(scrub_content(&input), "safeinjectendisodone");
800    }
801
802    #[test]
803    fn test_scrub_content_zero_width() {
804        // U+200B ZERO WIDTH SPACE, U+200C ZERO WIDTH NON-JOINER, U+200D ZERO WIDTH JOINER,
805        // U+200F RIGHT-TO-LEFT MARK.
806        let input = format!("a\u{200B}b\u{200C}c\u{200D}d\u{200F}e");
807        assert_eq!(scrub_content(&input), "abcde");
808    }
809
810    #[test]
811    fn test_scrub_content_bom() {
812        // U+FEFF BYTE ORDER MARK must be stripped.
813        let input = format!("\u{FEFF}hello");
814        assert_eq!(scrub_content(&input), "hello");
815    }
816
817    #[test]
818    fn test_scrub_content_clean_string_unchanged() {
819        let input = "Hello, World! 123 — normal text.";
820        assert_eq!(scrub_content(input), input);
821    }
822
823    #[test]
824    fn test_truncate_prompt_within_limit() {
825        let result = truncate_prompt("short".into(), 100);
826        assert_eq!(result, "short");
827    }
828
829    #[test]
830    fn test_truncate_prompt_zero_max_bytes() {
831        let result = truncate_prompt("hello".into(), 0);
832        assert_eq!(result, "");
833    }
834
835    #[test]
836    fn test_truncate_prompt_long_facts() {
837        let facts: Vec<String> = (0..20)
838            .map(|i| format!("fact_{i}_{}", "x".repeat(20)))
839            .collect();
840        let prompt = facts.join("\n");
841        let result = truncate_prompt(prompt, 200);
842        assert!(
843            result.ends_with("..."),
844            "truncated prompt must end with '...'"
845        );
846        // byte length must be at most max_bytes + 3 (the "..." suffix)
847        assert!(result.len() <= 203);
848        assert!(std::str::from_utf8(result.as_bytes()).is_ok());
849    }
850
851    #[test]
852    fn test_truncate_prompt_utf8_boundary() {
853        // Each '🔥' is 4 bytes; 100 emojis = 400 bytes.
854        let prompt = "🔥".repeat(100);
855        let result = truncate_prompt(prompt, 10);
856        assert!(
857            result.ends_with("..."),
858            "truncated prompt must end with '...'"
859        );
860        // floor_char_boundary(10) for 4-byte chars lands at 8 (2 full emojis = 8 bytes)
861        assert_eq!(result.len(), 8 + 3, "2 emojis (8 bytes) + '...' (3 bytes)");
862        assert!(std::str::from_utf8(result.as_bytes()).is_ok());
863    }
864
865    #[tokio::test]
866    async fn test_assign_to_community_majority_vote() {
867        let store = setup().await;
868
869        // Setup: community C1 with members [A, B], then add D with edges to both A and B.
870        let a = store
871            .upsert_entity("AA", "AA", EntityType::Concept, None)
872            .await
873            .unwrap();
874        let b = store
875            .upsert_entity("BB", "BB", EntityType::Concept, None)
876            .await
877            .unwrap();
878        let d = store
879            .upsert_entity("DD", "DD", EntityType::Concept, None)
880            .await
881            .unwrap();
882
883        store
884            .upsert_community("test_community", "summary", &[a, b], None)
885            .await
886            .unwrap();
887
888        store.insert_edge(d, a, "r", "f", 1.0, None).await.unwrap();
889        store.insert_edge(d, b, "r", "f", 1.0, None).await.unwrap();
890
891        let result = assign_to_community(&store, d).await.unwrap();
892        assert!(result.is_some());
893
894        // The returned ID must be valid for subsequent lookups (HIGH-IC-01 regression test).
895        let returned_id = result.unwrap();
896        let community = store
897            .find_community_by_id(returned_id)
898            .await
899            .unwrap()
900            .expect("returned community_id must reference an existing row");
901        assert!(
902            community.entity_ids.contains(&d),
903            "D should be added to the community"
904        );
905        // Fingerprint must be NULL after assign (cache invalidated for next detect run).
906        assert!(
907            community.fingerprint.is_none(),
908            "fingerprint must be cleared after assign_to_community"
909        );
910    }
911
912    /// #1262: Second detect_communities call with no graph changes must produce 0 LLM calls.
913    #[tokio::test]
914    async fn test_incremental_detection_no_changes_skips_llm() {
915        let store = setup().await;
916        let (provider, call_buf) = recording_provider();
917
918        let a = store
919            .upsert_entity("X", "X", EntityType::Concept, None)
920            .await
921            .unwrap();
922        let b = store
923            .upsert_entity("Y", "Y", EntityType::Concept, None)
924            .await
925            .unwrap();
926        store
927            .insert_edge(a, b, "r", "X relates Y", 1.0, None)
928            .await
929            .unwrap();
930
931        // First run: LLM called once to summarize the community.
932        detect_communities(&store, &provider, usize::MAX, 4)
933            .await
934            .unwrap();
935        let first_calls = call_buf.lock().unwrap().len();
936        assert_eq!(first_calls, 1, "first run must produce exactly 1 LLM call");
937
938        // Second run: graph unchanged — 0 LLM calls.
939        detect_communities(&store, &provider, usize::MAX, 4)
940            .await
941            .unwrap();
942        let second_calls = call_buf.lock().unwrap().len();
943        assert_eq!(
944            second_calls, first_calls,
945            "second run with no graph changes must produce 0 additional LLM calls"
946        );
947    }
948
949    /// #1262: Adding an edge changes the fingerprint — LLM must be called again.
950    #[tokio::test]
951    async fn test_incremental_detection_edge_change_triggers_resummary() {
952        let store = setup().await;
953        let (provider, call_buf) = recording_provider();
954
955        let a = store
956            .upsert_entity("P", "P", EntityType::Concept, None)
957            .await
958            .unwrap();
959        let b = store
960            .upsert_entity("Q", "Q", EntityType::Concept, None)
961            .await
962            .unwrap();
963        store
964            .insert_edge(a, b, "r", "P relates Q", 1.0, None)
965            .await
966            .unwrap();
967
968        detect_communities(&store, &provider, usize::MAX, 4)
969            .await
970            .unwrap();
971        let after_first = call_buf.lock().unwrap().len();
972        assert_eq!(after_first, 1);
973
974        // Add a new edge within the community to change its fingerprint.
975        store
976            .insert_edge(b, a, "r2", "Q also relates P", 1.0, None)
977            .await
978            .unwrap();
979
980        detect_communities(&store, &provider, usize::MAX, 4)
981            .await
982            .unwrap();
983        let after_second = call_buf.lock().unwrap().len();
984        assert_eq!(
985            after_second, 2,
986            "edge change must trigger one additional LLM call"
987        );
988    }
989
990    /// #1262: Communities whose fingerprints vanish are deleted on refresh.
991    #[tokio::test]
992    async fn test_incremental_detection_dissolved_community_deleted() {
993        let store = setup().await;
994        let provider = mock_provider();
995
996        let a = store
997            .upsert_entity("M1", "M1", EntityType::Concept, None)
998            .await
999            .unwrap();
1000        let b = store
1001            .upsert_entity("M2", "M2", EntityType::Concept, None)
1002            .await
1003            .unwrap();
1004        let edge_id = store
1005            .insert_edge(a, b, "r", "M1 relates M2", 1.0, None)
1006            .await
1007            .unwrap();
1008
1009        detect_communities(&store, &provider, usize::MAX, 4)
1010            .await
1011            .unwrap();
1012        assert_eq!(store.community_count().await.unwrap(), 1);
1013
1014        // Invalidate the edge — community dissolves.
1015        store.invalidate_edge(edge_id).await.unwrap();
1016
1017        detect_communities(&store, &provider, usize::MAX, 4)
1018            .await
1019            .unwrap();
1020        assert_eq!(
1021            store.community_count().await.unwrap(),
1022            0,
1023            "dissolved community must be deleted on next refresh"
1024        );
1025    }
1026
1027    /// #1260: Sequential fallback (concurrency=1) produces correct results.
1028    #[tokio::test]
1029    async fn test_detect_communities_concurrency_one() {
1030        let store = setup().await;
1031        let provider = mock_provider();
1032
1033        let a = store
1034            .upsert_entity("C1A", "C1A", EntityType::Concept, None)
1035            .await
1036            .unwrap();
1037        let b = store
1038            .upsert_entity("C1B", "C1B", EntityType::Concept, None)
1039            .await
1040            .unwrap();
1041        store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
1042
1043        let count = detect_communities(&store, &provider, usize::MAX, 1)
1044            .await
1045            .unwrap();
1046        assert_eq!(count, 1, "concurrency=1 must still detect the community");
1047        assert_eq!(store.community_count().await.unwrap(), 1);
1048    }
1049
1050    #[test]
1051    fn test_compute_fingerprint_deterministic() {
1052        let fp1 = compute_partition_fingerprint(&[1, 2, 3], &[10, 20]);
1053        let fp2 = compute_partition_fingerprint(&[3, 1, 2], &[20, 10]);
1054        assert_eq!(fp1, fp2, "fingerprint must be order-independent");
1055
1056        let fp3 = compute_partition_fingerprint(&[1, 2, 3], &[10, 30]);
1057        assert_ne!(
1058            fp1, fp3,
1059            "different edge IDs must produce different fingerprint"
1060        );
1061
1062        let fp4 = compute_partition_fingerprint(&[1, 2, 4], &[10, 20]);
1063        assert_ne!(
1064            fp1, fp4,
1065            "different entity IDs must produce different fingerprint"
1066        );
1067    }
1068
1069    /// Domain separator test: entity/edge sequences with same raw bytes must not collide.
1070    ///
1071    /// Without domain separators, entities=[1,2] edges=[3] would hash identically to
1072    /// entities=[1] edges=[2,3] (same concatenated le_bytes). With separators they differ.
1073    #[test]
1074    fn test_compute_fingerprint_domain_separation() {
1075        let fp_a = compute_partition_fingerprint(&[1, 2], &[3]);
1076        let fp_b = compute_partition_fingerprint(&[1], &[2, 3]);
1077        assert_ne!(
1078            fp_a, fp_b,
1079            "entity/edge sequences with same raw bytes must produce different fingerprints"
1080        );
1081    }
1082}