1use std::collections::HashMap;
5use std::sync::Arc;
6
7use futures::TryStreamExt as _;
8use petgraph::Graph;
9use petgraph::graph::NodeIndex;
10use tokio::sync::Semaphore;
11use tokio::task::JoinSet;
12use zeph_llm::LlmProvider as _;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{Message, Role};
15
16use crate::error::MemoryError;
17
18use super::store::GraphStore;
19
20const MAX_LABEL_PROPAGATION_ITERATIONS: usize = 50;
21
22fn scrub_content(s: &str) -> String {
31 s.chars()
32 .filter(|c| {
33 !c.is_control()
34 && !matches!(*c as u32,
35 0x200B..=0x200F | 0x202A..=0x202E | 0x2066..=0x2069 | 0xFEFF
36 )
37 })
38 .collect()
39}
40
41#[derive(Debug, Default)]
43pub struct GraphEvictionStats {
44 pub expired_edges_deleted: usize,
45 pub orphan_entities_deleted: usize,
46 pub capped_entities_deleted: usize,
47}
48
49fn truncate_prompt(prompt: String, max_bytes: usize) -> String {
55 if max_bytes == 0 {
56 return String::new();
57 }
58 if prompt.len() <= max_bytes {
59 return prompt;
60 }
61 let boundary = prompt.floor_char_boundary(max_bytes);
62 format!("{}...", &prompt[..boundary])
63}
64
65fn compute_partition_fingerprint(entity_ids: &[i64], intra_edge_ids: &[i64]) -> String {
71 let mut hasher = blake3::Hasher::new();
72 let mut sorted_entities = entity_ids.to_vec();
73 sorted_entities.sort_unstable();
74 hasher.update(b"entities");
75 for id in &sorted_entities {
76 hasher.update(&id.to_le_bytes());
77 }
78 let mut sorted_edges = intra_edge_ids.to_vec();
79 sorted_edges.sort_unstable();
80 hasher.update(b"edges");
81 for id in &sorted_edges {
82 hasher.update(&id.to_le_bytes());
83 }
84 hasher.finalize().to_hex().to_string()
85}
86
87struct CommunityData {
89 entity_ids: Vec<i64>,
90 entity_names: Vec<String>,
91 intra_facts: Vec<String>,
92 fingerprint: String,
93 name: String,
94}
95
96#[allow(clippy::too_many_lines)]
115pub async fn detect_communities(
116 store: &GraphStore,
117 provider: &AnyProvider,
118 community_summary_max_prompt_bytes: usize,
119 concurrency: usize,
120) -> Result<usize, MemoryError> {
121 let entities = store.all_entities().await?;
122 if entities.len() < 2 {
123 return Ok(0);
124 }
125
126 let mut graph = Graph::<i64, (), petgraph::Undirected>::new_undirected();
131 let mut node_map: HashMap<i64, NodeIndex> = HashMap::new();
132
133 for entity in &entities {
134 let idx = graph.add_node(entity.id);
135 node_map.insert(entity.id, idx);
136 }
137
138 let edges: Vec<_> = store.all_active_edges_stream().try_collect().await?;
139 for edge in &edges {
140 if let (Some(&src_idx), Some(&tgt_idx)) = (
141 node_map.get(&edge.source_entity_id),
142 node_map.get(&edge.target_entity_id),
143 ) {
144 graph.add_edge(src_idx, tgt_idx, ());
145 }
146 }
147
148 let mut labels: Vec<usize> = (0..graph.node_count()).collect();
150
151 for _ in 0..MAX_LABEL_PROPAGATION_ITERATIONS {
152 let mut changed = false;
153 for node_idx in graph.node_indices() {
154 let neighbors: Vec<NodeIndex> = graph.neighbors(node_idx).collect();
155 if neighbors.is_empty() {
156 continue;
157 }
158
159 let mut freq: HashMap<usize, usize> = HashMap::new();
160 for &nbr in &neighbors {
161 *freq.entry(labels[nbr.index()]).or_insert(0) += 1;
162 }
163
164 let max_count = *freq.values().max().unwrap_or(&0);
166 let best_label = freq
168 .iter()
169 .filter(|&(_, count)| *count == max_count)
170 .map(|(&label, _)| label)
171 .min()
172 .unwrap_or(labels[node_idx.index()]);
173
174 if labels[node_idx.index()] != best_label {
175 labels[node_idx.index()] = best_label;
176 changed = true;
177 }
178 }
179 if !changed {
180 break;
181 }
182 }
183
184 let mut communities: HashMap<usize, Vec<i64>> = HashMap::new();
186 for node_idx in graph.node_indices() {
187 let entity_id = graph[node_idx];
188 communities
189 .entry(labels[node_idx.index()])
190 .or_default()
191 .push(entity_id);
192 }
193
194 communities.retain(|_, members| members.len() >= 2);
196
197 let entity_name_map: HashMap<i64, &str> =
199 entities.iter().map(|e| (e.id, e.name.as_str())).collect();
200
201 let mut edge_facts_map: HashMap<(i64, i64), Vec<String>> = HashMap::new();
203 let mut edge_id_map: HashMap<(i64, i64), Vec<i64>> = HashMap::new();
204 for edge in &edges {
205 let key = (edge.source_entity_id, edge.target_entity_id);
206 edge_facts_map
207 .entry(key)
208 .or_default()
209 .push(edge.fact.clone());
210 edge_id_map.entry(key).or_default().push(edge.id);
211 }
212
213 let stored_fingerprints = store.community_fingerprints().await?;
215
216 let mut sorted_labels: Vec<usize> = communities.keys().copied().collect();
218 sorted_labels.sort_unstable();
219
220 let mut to_summarize: Vec<CommunityData> = Vec::new();
222 let mut unchanged_count = 0usize;
223 let mut new_fingerprints: std::collections::HashSet<String> = std::collections::HashSet::new();
224
225 for (label_index, &label) in sorted_labels.iter().enumerate() {
226 let entity_ids = communities[&label].as_slice();
227 let member_set: std::collections::HashSet<i64> = entity_ids.iter().copied().collect();
228
229 let mut intra_facts: Vec<String> = Vec::new();
230 let mut intra_edge_ids: Vec<i64> = Vec::new();
231 for (&(src, tgt), facts) in &edge_facts_map {
232 if member_set.contains(&src) && member_set.contains(&tgt) {
233 intra_facts.extend(facts.iter().map(|f| scrub_content(f)));
234 if let Some(ids) = edge_id_map.get(&(src, tgt)) {
235 intra_edge_ids.extend_from_slice(ids);
236 }
237 }
238 }
239
240 let fingerprint = compute_partition_fingerprint(entity_ids, &intra_edge_ids);
241 new_fingerprints.insert(fingerprint.clone());
242
243 if stored_fingerprints.contains_key(&fingerprint) {
244 unchanged_count += 1;
246 continue;
247 }
248
249 let entity_names: Vec<String> = entity_ids
250 .iter()
251 .filter_map(|id| entity_name_map.get(id).map(|&s| scrub_content(s)))
252 .collect();
253
254 let base_name = entity_names
257 .iter()
258 .take(3)
259 .cloned()
260 .collect::<Vec<_>>()
261 .join(", ");
262 let name = format!("{base_name} [{label_index}]");
263
264 to_summarize.push(CommunityData {
265 entity_ids: entity_ids.to_vec(),
266 entity_names,
267 intra_facts,
268 fingerprint,
269 name,
270 });
271 }
272
273 tracing::debug!(
274 total = sorted_labels.len(),
275 unchanged = unchanged_count,
276 to_summarize = to_summarize.len(),
277 "community detection: partition classification complete"
278 );
279
280 for (stored_fp, community_id) in &stored_fingerprints {
283 if !new_fingerprints.contains(stored_fp.as_str()) {
284 store.delete_community_by_id(*community_id).await?;
285 }
286 }
287
288 let semaphore = Arc::new(Semaphore::new(concurrency.max(1)));
290 let mut join_set: JoinSet<(String, String, Vec<i64>, String)> = JoinSet::new();
291
292 for data in to_summarize {
293 let provider = provider.clone();
294 let sem = Arc::clone(&semaphore);
295 let max_bytes = community_summary_max_prompt_bytes;
296 join_set.spawn(async move {
297 let _permit = sem.acquire().await.expect("semaphore is never closed");
298 let summary = match generate_community_summary(
299 &provider,
300 &data.entity_names,
301 &data.intra_facts,
302 max_bytes,
303 )
304 .await
305 {
306 Ok(text) => text,
307 Err(e) => {
308 tracing::warn!(
309 community = %data.name,
310 "community summary generation failed: {e:#}"
311 );
312 String::new()
313 }
314 };
315 (data.name, summary, data.entity_ids, data.fingerprint)
316 });
317 }
318
319 let mut results: Vec<(String, String, Vec<i64>, String)> = Vec::new();
321 while let Some(outcome) = join_set.join_next().await {
322 match outcome {
323 Ok(tuple) => results.push(tuple),
324 Err(e) => {
325 tracing::error!(
326 panicked = e.is_panic(),
327 cancelled = e.is_cancelled(),
328 "community summary task failed"
329 );
330 }
331 }
332 }
333
334 results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
336
337 let mut count = unchanged_count;
339 for (name, summary, entity_ids, fingerprint) in results {
340 store
341 .upsert_community(&name, &summary, &entity_ids, Some(&fingerprint))
342 .await?;
343 count += 1;
344 }
345
346 Ok(count)
347}
348
349pub async fn assign_to_community(
360 store: &GraphStore,
361 entity_id: i64,
362) -> Result<Option<i64>, MemoryError> {
363 let edges = store.edges_for_entity(entity_id).await?;
364 if edges.is_empty() {
365 return Ok(None);
366 }
367
368 let neighbor_ids: Vec<i64> = edges
369 .iter()
370 .map(|e| {
371 if e.source_entity_id == entity_id {
372 e.target_entity_id
373 } else {
374 e.source_entity_id
375 }
376 })
377 .collect();
378
379 let mut community_votes: HashMap<i64, usize> = HashMap::new();
380 for &nbr_id in &neighbor_ids {
381 if let Some(community) = store.community_for_entity(nbr_id).await? {
382 *community_votes.entry(community.id).or_insert(0) += 1;
383 }
384 }
385
386 if community_votes.is_empty() {
387 return Ok(None);
388 }
389
390 let Some((&best_community_id, _)) =
393 community_votes
394 .iter()
395 .max_by(|&(&id_a, &count_a), &(&id_b, &count_b)| {
396 count_a.cmp(&count_b).then(id_b.cmp(&id_a))
397 })
398 else {
399 return Ok(None);
400 };
401
402 if let Some(mut target) = store.find_community_by_id(best_community_id).await? {
403 if !target.entity_ids.contains(&entity_id) {
404 target.entity_ids.push(entity_id);
405 store
406 .upsert_community(&target.name, &target.summary, &target.entity_ids, None)
407 .await?;
408 store.clear_community_fingerprint(best_community_id).await?;
410 }
411 return Ok(Some(best_community_id));
412 }
413
414 Ok(None)
415}
416
417pub async fn cleanup_stale_entity_embeddings(
425 _store: &GraphStore,
426 _embeddings: &crate::embedding_store::EmbeddingStore,
427) -> Result<usize, MemoryError> {
428 Ok(0)
433}
434
435pub async fn run_graph_eviction(
441 store: &GraphStore,
442 expired_edge_retention_days: u32,
443 max_entities: usize,
444) -> Result<GraphEvictionStats, MemoryError> {
445 let expired_edges_deleted = store
446 .delete_expired_edges(expired_edge_retention_days)
447 .await?;
448 let orphan_entities_deleted = store
449 .delete_orphan_entities(expired_edge_retention_days)
450 .await?;
451 let capped_entities_deleted = if max_entities > 0 {
452 store.cap_entities(max_entities).await?
453 } else {
454 0
455 };
456
457 Ok(GraphEvictionStats {
458 expired_edges_deleted,
459 orphan_entities_deleted,
460 capped_entities_deleted,
461 })
462}
463
464async fn generate_community_summary(
465 provider: &AnyProvider,
466 entity_names: &[String],
467 edge_facts: &[String],
468 max_prompt_bytes: usize,
469) -> Result<String, MemoryError> {
470 let entities_str = entity_names.join(", ");
471 let facts_str = edge_facts
473 .iter()
474 .take(20)
475 .map(|f| format!("- {f}"))
476 .collect::<Vec<_>>()
477 .join("\n");
478
479 let raw_prompt = format!(
480 "Summarize the following group of related entities and their relationships \
481 into a single paragraph (2-3 sentences). Focus on the theme that connects \
482 them and the key relationships.\n\nEntities: {entities_str}\n\
483 Relationships:\n{facts_str}\n\nSummary:"
484 );
485
486 let original_bytes = raw_prompt.len();
487 let truncated = raw_prompt.len() > max_prompt_bytes;
488 let prompt = truncate_prompt(raw_prompt, max_prompt_bytes);
489 if prompt.is_empty() {
490 return Ok(String::new());
491 }
492 if truncated {
493 tracing::warn!(
494 entity_count = entity_names.len(),
495 original_bytes,
496 truncated_bytes = prompt.len(),
497 "community summary prompt truncated"
498 );
499 }
500
501 let messages = [Message::from_legacy(Role::User, prompt)];
502 let response: String = provider.chat(&messages).await.map_err(MemoryError::Llm)?;
503 Ok(response)
504}
505
506#[cfg(test)]
507mod tests {
508 use std::sync::{Arc, Mutex};
509
510 use super::*;
511 use crate::graph::types::EntityType;
512 use crate::sqlite::SqliteStore;
513
514 async fn setup() -> GraphStore {
515 let store = SqliteStore::new(":memory:").await.unwrap();
516 GraphStore::new(store.pool().clone())
517 }
518
519 fn mock_provider() -> AnyProvider {
520 AnyProvider::Mock(zeph_llm::mock::MockProvider::default())
521 }
522
523 fn recording_provider() -> (
524 AnyProvider,
525 Arc<Mutex<Vec<Vec<zeph_llm::provider::Message>>>>,
526 ) {
527 let (mock, buf) = zeph_llm::mock::MockProvider::default().with_recording();
528 (AnyProvider::Mock(mock), buf)
529 }
530
531 #[tokio::test]
532 async fn test_detect_communities_empty_graph() {
533 let store = setup().await;
534 let provider = mock_provider();
535 let count = detect_communities(&store, &provider, usize::MAX, 4)
536 .await
537 .unwrap();
538 assert_eq!(count, 0);
539 }
540
541 #[tokio::test]
542 async fn test_detect_communities_single_entity() {
543 let store = setup().await;
544 let provider = mock_provider();
545 store
546 .upsert_entity("Solo", "Solo", EntityType::Concept, None)
547 .await
548 .unwrap();
549 let count = detect_communities(&store, &provider, usize::MAX, 4)
550 .await
551 .unwrap();
552 assert_eq!(count, 0, "single isolated entity must not form a community");
553 }
554
555 #[tokio::test]
556 async fn test_single_entity_community_filtered() {
557 let store = setup().await;
558 let provider = mock_provider();
559
560 let a = store
562 .upsert_entity("A", "A", EntityType::Concept, None)
563 .await
564 .unwrap();
565 let b = store
566 .upsert_entity("B", "B", EntityType::Concept, None)
567 .await
568 .unwrap();
569 let c = store
570 .upsert_entity("C", "C", EntityType::Concept, None)
571 .await
572 .unwrap();
573 let _iso = store
574 .upsert_entity("Isolated", "Isolated", EntityType::Concept, None)
575 .await
576 .unwrap();
577
578 store
579 .insert_edge(a, b, "r", "A relates B", 1.0, None)
580 .await
581 .unwrap();
582 store
583 .insert_edge(b, c, "r", "B relates C", 1.0, None)
584 .await
585 .unwrap();
586
587 let count = detect_communities(&store, &provider, usize::MAX, 4)
588 .await
589 .unwrap();
590 assert_eq!(count, 1, "only the 3-entity cluster should be detected");
592
593 let communities = store.all_communities().await.unwrap();
594 assert_eq!(communities.len(), 1);
595 assert!(
596 !communities[0].entity_ids.contains(&_iso),
597 "isolated entity must not be in any community"
598 );
599 }
600
601 #[tokio::test]
602 async fn test_label_propagation_basic() {
603 let store = setup().await;
604 let provider = mock_provider();
605
606 let mut cluster_ids: Vec<Vec<i64>> = Vec::new();
608 for cluster in 0..4_i64 {
609 let mut ids = Vec::new();
610 for node in 0..3_i64 {
611 let name = format!("c{cluster}_n{node}");
612 let id = store
613 .upsert_entity(&name, &name, EntityType::Concept, None)
614 .await
615 .unwrap();
616 ids.push(id);
617 }
618 store
620 .insert_edge(ids[0], ids[1], "r", "f", 1.0, None)
621 .await
622 .unwrap();
623 store
624 .insert_edge(ids[1], ids[2], "r", "f", 1.0, None)
625 .await
626 .unwrap();
627 cluster_ids.push(ids);
628 }
629
630 let count = detect_communities(&store, &provider, usize::MAX, 4)
631 .await
632 .unwrap();
633 assert_eq!(count, 4, "expected 4 communities, one per cluster");
634
635 let communities = store.all_communities().await.unwrap();
636 assert_eq!(communities.len(), 4);
637
638 for ids in &cluster_ids {
640 let found = communities
641 .iter()
642 .filter(|c| ids.iter().any(|id| c.entity_ids.contains(id)))
643 .count();
644 assert_eq!(
645 found, 1,
646 "all nodes of a cluster must be in the same community"
647 );
648 }
649 }
650
651 #[tokio::test]
652 async fn test_all_isolated_nodes() {
653 let store = setup().await;
654 let provider = mock_provider();
655
656 for i in 0..5_i64 {
658 store
659 .upsert_entity(
660 &format!("iso_{i}"),
661 &format!("iso_{i}"),
662 EntityType::Concept,
663 None,
664 )
665 .await
666 .unwrap();
667 }
668
669 let count = detect_communities(&store, &provider, usize::MAX, 4)
670 .await
671 .unwrap();
672 assert_eq!(count, 0, "zero-edge graph must produce no communities");
673 assert_eq!(store.community_count().await.unwrap(), 0);
674 }
675
676 #[tokio::test]
677 async fn test_eviction_expired_edges() {
678 let store = setup().await;
679
680 let a = store
681 .upsert_entity("EA", "EA", EntityType::Concept, None)
682 .await
683 .unwrap();
684 let b = store
685 .upsert_entity("EB", "EB", EntityType::Concept, None)
686 .await
687 .unwrap();
688 let edge_id = store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
689 store.invalidate_edge(edge_id).await.unwrap();
690
691 sqlx::query(
693 "UPDATE graph_edges SET expired_at = datetime('now', '-200 days') WHERE id = ?1",
694 )
695 .bind(edge_id)
696 .execute(store.pool())
697 .await
698 .unwrap();
699
700 let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
701 assert_eq!(stats.expired_edges_deleted, 1);
702 }
703
704 #[tokio::test]
705 async fn test_eviction_orphan_entities() {
706 let store = setup().await;
707
708 let iso = store
709 .upsert_entity("Orphan", "Orphan", EntityType::Concept, None)
710 .await
711 .unwrap();
712
713 sqlx::query(
715 "UPDATE graph_entities SET last_seen_at = datetime('now', '-200 days') WHERE id = ?1",
716 )
717 .bind(iso)
718 .execute(store.pool())
719 .await
720 .unwrap();
721
722 let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
723 assert_eq!(stats.orphan_entities_deleted, 1);
724 }
725
726 #[tokio::test]
727 async fn test_eviction_entity_cap() {
728 let store = setup().await;
729
730 for i in 0..5_i64 {
732 let name = format!("cap_entity_{i}");
733 store
734 .upsert_entity(&name, &name, EntityType::Concept, None)
735 .await
736 .unwrap();
737 }
738
739 let stats = run_graph_eviction(&store, 90, 3).await.unwrap();
740 assert_eq!(
741 stats.capped_entities_deleted, 2,
742 "should delete 5-3=2 entities"
743 );
744 assert_eq!(store.entity_count().await.unwrap(), 3);
745 }
746
747 #[tokio::test]
748 async fn test_assign_to_community_no_neighbors() {
749 let store = setup().await;
750 let entity_id = store
751 .upsert_entity("Loner", "Loner", EntityType::Concept, None)
752 .await
753 .unwrap();
754
755 let result = assign_to_community(&store, entity_id).await.unwrap();
756 assert!(result.is_none());
757 }
758
759 #[tokio::test]
760 async fn test_extraction_count_persistence() {
761 use tempfile::NamedTempFile;
762 let tmp = NamedTempFile::new().unwrap();
764 let path = tmp.path().to_str().unwrap().to_owned();
765
766 let store1 = {
767 let s = crate::sqlite::SqliteStore::new(&path).await.unwrap();
768 GraphStore::new(s.pool().clone())
769 };
770
771 store1.set_metadata("extraction_count", "0").await.unwrap();
772 for i in 1..=5_i64 {
773 store1
774 .set_metadata("extraction_count", &i.to_string())
775 .await
776 .unwrap();
777 }
778
779 let store2 = {
781 let s = crate::sqlite::SqliteStore::new(&path).await.unwrap();
782 GraphStore::new(s.pool().clone())
783 };
784 assert_eq!(store2.extraction_count().await.unwrap(), 5);
785 }
786
787 #[test]
788 fn test_scrub_content_ascii_control() {
789 let input = "hello\nworld\r\x00\x01\x09end";
791 assert_eq!(scrub_content(input), "helloworldend");
792 }
793
794 #[test]
795 fn test_scrub_content_bidi_overrides() {
796 let input = format!("safe\u{202A}inject\u{202E}end\u{2066}iso\u{2069}done");
799 assert_eq!(scrub_content(&input), "safeinjectendisodone");
800 }
801
802 #[test]
803 fn test_scrub_content_zero_width() {
804 let input = format!("a\u{200B}b\u{200C}c\u{200D}d\u{200F}e");
807 assert_eq!(scrub_content(&input), "abcde");
808 }
809
810 #[test]
811 fn test_scrub_content_bom() {
812 let input = format!("\u{FEFF}hello");
814 assert_eq!(scrub_content(&input), "hello");
815 }
816
817 #[test]
818 fn test_scrub_content_clean_string_unchanged() {
819 let input = "Hello, World! 123 — normal text.";
820 assert_eq!(scrub_content(input), input);
821 }
822
823 #[test]
824 fn test_truncate_prompt_within_limit() {
825 let result = truncate_prompt("short".into(), 100);
826 assert_eq!(result, "short");
827 }
828
829 #[test]
830 fn test_truncate_prompt_zero_max_bytes() {
831 let result = truncate_prompt("hello".into(), 0);
832 assert_eq!(result, "");
833 }
834
835 #[test]
836 fn test_truncate_prompt_long_facts() {
837 let facts: Vec<String> = (0..20)
838 .map(|i| format!("fact_{i}_{}", "x".repeat(20)))
839 .collect();
840 let prompt = facts.join("\n");
841 let result = truncate_prompt(prompt, 200);
842 assert!(
843 result.ends_with("..."),
844 "truncated prompt must end with '...'"
845 );
846 assert!(result.len() <= 203);
848 assert!(std::str::from_utf8(result.as_bytes()).is_ok());
849 }
850
851 #[test]
852 fn test_truncate_prompt_utf8_boundary() {
853 let prompt = "🔥".repeat(100);
855 let result = truncate_prompt(prompt, 10);
856 assert!(
857 result.ends_with("..."),
858 "truncated prompt must end with '...'"
859 );
860 assert_eq!(result.len(), 8 + 3, "2 emojis (8 bytes) + '...' (3 bytes)");
862 assert!(std::str::from_utf8(result.as_bytes()).is_ok());
863 }
864
865 #[tokio::test]
866 async fn test_assign_to_community_majority_vote() {
867 let store = setup().await;
868
869 let a = store
871 .upsert_entity("AA", "AA", EntityType::Concept, None)
872 .await
873 .unwrap();
874 let b = store
875 .upsert_entity("BB", "BB", EntityType::Concept, None)
876 .await
877 .unwrap();
878 let d = store
879 .upsert_entity("DD", "DD", EntityType::Concept, None)
880 .await
881 .unwrap();
882
883 store
884 .upsert_community("test_community", "summary", &[a, b], None)
885 .await
886 .unwrap();
887
888 store.insert_edge(d, a, "r", "f", 1.0, None).await.unwrap();
889 store.insert_edge(d, b, "r", "f", 1.0, None).await.unwrap();
890
891 let result = assign_to_community(&store, d).await.unwrap();
892 assert!(result.is_some());
893
894 let returned_id = result.unwrap();
896 let community = store
897 .find_community_by_id(returned_id)
898 .await
899 .unwrap()
900 .expect("returned community_id must reference an existing row");
901 assert!(
902 community.entity_ids.contains(&d),
903 "D should be added to the community"
904 );
905 assert!(
907 community.fingerprint.is_none(),
908 "fingerprint must be cleared after assign_to_community"
909 );
910 }
911
912 #[tokio::test]
914 async fn test_incremental_detection_no_changes_skips_llm() {
915 let store = setup().await;
916 let (provider, call_buf) = recording_provider();
917
918 let a = store
919 .upsert_entity("X", "X", EntityType::Concept, None)
920 .await
921 .unwrap();
922 let b = store
923 .upsert_entity("Y", "Y", EntityType::Concept, None)
924 .await
925 .unwrap();
926 store
927 .insert_edge(a, b, "r", "X relates Y", 1.0, None)
928 .await
929 .unwrap();
930
931 detect_communities(&store, &provider, usize::MAX, 4)
933 .await
934 .unwrap();
935 let first_calls = call_buf.lock().unwrap().len();
936 assert_eq!(first_calls, 1, "first run must produce exactly 1 LLM call");
937
938 detect_communities(&store, &provider, usize::MAX, 4)
940 .await
941 .unwrap();
942 let second_calls = call_buf.lock().unwrap().len();
943 assert_eq!(
944 second_calls, first_calls,
945 "second run with no graph changes must produce 0 additional LLM calls"
946 );
947 }
948
949 #[tokio::test]
951 async fn test_incremental_detection_edge_change_triggers_resummary() {
952 let store = setup().await;
953 let (provider, call_buf) = recording_provider();
954
955 let a = store
956 .upsert_entity("P", "P", EntityType::Concept, None)
957 .await
958 .unwrap();
959 let b = store
960 .upsert_entity("Q", "Q", EntityType::Concept, None)
961 .await
962 .unwrap();
963 store
964 .insert_edge(a, b, "r", "P relates Q", 1.0, None)
965 .await
966 .unwrap();
967
968 detect_communities(&store, &provider, usize::MAX, 4)
969 .await
970 .unwrap();
971 let after_first = call_buf.lock().unwrap().len();
972 assert_eq!(after_first, 1);
973
974 store
976 .insert_edge(b, a, "r2", "Q also relates P", 1.0, None)
977 .await
978 .unwrap();
979
980 detect_communities(&store, &provider, usize::MAX, 4)
981 .await
982 .unwrap();
983 let after_second = call_buf.lock().unwrap().len();
984 assert_eq!(
985 after_second, 2,
986 "edge change must trigger one additional LLM call"
987 );
988 }
989
990 #[tokio::test]
992 async fn test_incremental_detection_dissolved_community_deleted() {
993 let store = setup().await;
994 let provider = mock_provider();
995
996 let a = store
997 .upsert_entity("M1", "M1", EntityType::Concept, None)
998 .await
999 .unwrap();
1000 let b = store
1001 .upsert_entity("M2", "M2", EntityType::Concept, None)
1002 .await
1003 .unwrap();
1004 let edge_id = store
1005 .insert_edge(a, b, "r", "M1 relates M2", 1.0, None)
1006 .await
1007 .unwrap();
1008
1009 detect_communities(&store, &provider, usize::MAX, 4)
1010 .await
1011 .unwrap();
1012 assert_eq!(store.community_count().await.unwrap(), 1);
1013
1014 store.invalidate_edge(edge_id).await.unwrap();
1016
1017 detect_communities(&store, &provider, usize::MAX, 4)
1018 .await
1019 .unwrap();
1020 assert_eq!(
1021 store.community_count().await.unwrap(),
1022 0,
1023 "dissolved community must be deleted on next refresh"
1024 );
1025 }
1026
1027 #[tokio::test]
1029 async fn test_detect_communities_concurrency_one() {
1030 let store = setup().await;
1031 let provider = mock_provider();
1032
1033 let a = store
1034 .upsert_entity("C1A", "C1A", EntityType::Concept, None)
1035 .await
1036 .unwrap();
1037 let b = store
1038 .upsert_entity("C1B", "C1B", EntityType::Concept, None)
1039 .await
1040 .unwrap();
1041 store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
1042
1043 let count = detect_communities(&store, &provider, usize::MAX, 1)
1044 .await
1045 .unwrap();
1046 assert_eq!(count, 1, "concurrency=1 must still detect the community");
1047 assert_eq!(store.community_count().await.unwrap(), 1);
1048 }
1049
1050 #[test]
1051 fn test_compute_fingerprint_deterministic() {
1052 let fp1 = compute_partition_fingerprint(&[1, 2, 3], &[10, 20]);
1053 let fp2 = compute_partition_fingerprint(&[3, 1, 2], &[20, 10]);
1054 assert_eq!(fp1, fp2, "fingerprint must be order-independent");
1055
1056 let fp3 = compute_partition_fingerprint(&[1, 2, 3], &[10, 30]);
1057 assert_ne!(
1058 fp1, fp3,
1059 "different edge IDs must produce different fingerprint"
1060 );
1061
1062 let fp4 = compute_partition_fingerprint(&[1, 2, 4], &[10, 20]);
1063 assert_ne!(
1064 fp1, fp4,
1065 "different entity IDs must produce different fingerprint"
1066 );
1067 }
1068
1069 #[test]
1074 fn test_compute_fingerprint_domain_separation() {
1075 let fp_a = compute_partition_fingerprint(&[1, 2], &[3]);
1076 let fp_b = compute_partition_fingerprint(&[1], &[2, 3]);
1077 assert_ne!(
1078 fp_a, fp_b,
1079 "entity/edge sequences with same raw bytes must produce different fingerprints"
1080 );
1081 }
1082}