1use std::collections::HashMap;
5use std::sync::Arc;
6
7use futures::TryStreamExt as _;
8use petgraph::Graph;
9use petgraph::graph::NodeIndex;
10use tokio::sync::Semaphore;
11use tokio::task::JoinSet;
12use zeph_llm::LlmProvider as _;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{Message, Role};
15
16use crate::error::MemoryError;
17
18use super::store::GraphStore;
19
20const MAX_LABEL_PROPAGATION_ITERATIONS: usize = 50;
21
22fn scrub_content(s: &str) -> String {
31 s.chars()
32 .filter(|c| {
33 !c.is_control()
34 && !matches!(*c as u32,
35 0x200B..=0x200F | 0x202A..=0x202E | 0x2066..=0x2069 | 0xFEFF
36 )
37 })
38 .collect()
39}
40
41#[derive(Debug, Default)]
43pub struct GraphEvictionStats {
44 pub expired_edges_deleted: usize,
45 pub orphan_entities_deleted: usize,
46 pub capped_entities_deleted: usize,
47}
48
49fn truncate_prompt(prompt: String, max_bytes: usize) -> String {
55 if max_bytes == 0 {
56 return String::new();
57 }
58 if prompt.len() <= max_bytes {
59 return prompt;
60 }
61 let boundary = prompt.floor_char_boundary(max_bytes);
62 format!("{}...", &prompt[..boundary])
63}
64
65fn compute_partition_fingerprint(entity_ids: &[i64], intra_edge_ids: &[i64]) -> String {
71 let mut hasher = blake3::Hasher::new();
72 let mut sorted_entities = entity_ids.to_vec();
73 sorted_entities.sort_unstable();
74 hasher.update(b"entities");
75 for id in &sorted_entities {
76 hasher.update(&id.to_le_bytes());
77 }
78 let mut sorted_edges = intra_edge_ids.to_vec();
79 sorted_edges.sort_unstable();
80 hasher.update(b"edges");
81 for id in &sorted_edges {
82 hasher.update(&id.to_le_bytes());
83 }
84 hasher.finalize().to_hex().to_string()
85}
86
87struct CommunityData {
89 entity_ids: Vec<i64>,
90 entity_names: Vec<String>,
91 intra_facts: Vec<String>,
92 fingerprint: String,
93 name: String,
94}
95
96#[allow(clippy::too_many_lines)]
115pub async fn detect_communities(
116 store: &GraphStore,
117 provider: &AnyProvider,
118 community_summary_max_prompt_bytes: usize,
119 concurrency: usize,
120 edge_chunk_size: usize,
121) -> Result<usize, MemoryError> {
122 let entities = store.all_entities().await?;
123 if entities.len() < 2 {
124 return Ok(0);
125 }
126
127 let mut graph = Graph::<i64, (), petgraph::Undirected>::new_undirected();
132 let mut node_map: HashMap<i64, NodeIndex> = HashMap::new();
133
134 for entity in &entities {
135 let idx = graph.add_node(entity.id);
136 node_map.insert(entity.id, idx);
137 }
138
139 let mut edge_facts_map: HashMap<(i64, i64), Vec<String>> = HashMap::new();
144 let mut edge_id_map: HashMap<(i64, i64), Vec<i64>> = HashMap::new();
145
146 if edge_chunk_size == 0 {
147 let edges: Vec<_> = store.all_active_edges_stream().try_collect().await?;
149 for edge in &edges {
150 if let (Some(&src_idx), Some(&tgt_idx)) = (
151 node_map.get(&edge.source_entity_id),
152 node_map.get(&edge.target_entity_id),
153 ) {
154 graph.add_edge(src_idx, tgt_idx, ());
155 }
156 let key = (edge.source_entity_id, edge.target_entity_id);
157 edge_facts_map
158 .entry(key)
159 .or_default()
160 .push(edge.fact.clone());
161 edge_id_map.entry(key).or_default().push(edge.id);
162 }
163 } else {
164 let limit = i64::try_from(edge_chunk_size).unwrap_or(i64::MAX);
165 let mut last_id: i64 = 0;
166 loop {
167 let chunk = store.edges_after_id(last_id, limit).await?;
168 if chunk.is_empty() {
169 break;
170 }
171 last_id = chunk.last().expect("non-empty chunk has a last element").id;
173 for edge in &chunk {
174 if let (Some(&src_idx), Some(&tgt_idx)) = (
175 node_map.get(&edge.source_entity_id),
176 node_map.get(&edge.target_entity_id),
177 ) {
178 graph.add_edge(src_idx, tgt_idx, ());
179 }
180 let key = (edge.source_entity_id, edge.target_entity_id);
181 edge_facts_map
182 .entry(key)
183 .or_default()
184 .push(edge.fact.clone());
185 edge_id_map.entry(key).or_default().push(edge.id);
186 }
187 }
189 }
190
191 let mut labels: Vec<usize> = (0..graph.node_count()).collect();
193
194 for _ in 0..MAX_LABEL_PROPAGATION_ITERATIONS {
195 let mut changed = false;
196 for node_idx in graph.node_indices() {
197 let neighbors: Vec<NodeIndex> = graph.neighbors(node_idx).collect();
198 if neighbors.is_empty() {
199 continue;
200 }
201
202 let mut freq: HashMap<usize, usize> = HashMap::new();
203 for &nbr in &neighbors {
204 *freq.entry(labels[nbr.index()]).or_insert(0) += 1;
205 }
206
207 let max_count = *freq.values().max().unwrap_or(&0);
209 let best_label = freq
211 .iter()
212 .filter(|&(_, count)| *count == max_count)
213 .map(|(&label, _)| label)
214 .min()
215 .unwrap_or(labels[node_idx.index()]);
216
217 if labels[node_idx.index()] != best_label {
218 labels[node_idx.index()] = best_label;
219 changed = true;
220 }
221 }
222 if !changed {
223 break;
224 }
225 }
226
227 let mut communities: HashMap<usize, Vec<i64>> = HashMap::new();
229 for node_idx in graph.node_indices() {
230 let entity_id = graph[node_idx];
231 communities
232 .entry(labels[node_idx.index()])
233 .or_default()
234 .push(entity_id);
235 }
236
237 communities.retain(|_, members| members.len() >= 2);
239
240 let entity_name_map: HashMap<i64, &str> =
242 entities.iter().map(|e| (e.id, e.name.as_str())).collect();
243
244 let stored_fingerprints = store.community_fingerprints().await?;
246
247 let mut sorted_labels: Vec<usize> = communities.keys().copied().collect();
249 sorted_labels.sort_unstable();
250
251 let mut to_summarize: Vec<CommunityData> = Vec::new();
253 let mut unchanged_count = 0usize;
254 let mut new_fingerprints: std::collections::HashSet<String> = std::collections::HashSet::new();
255
256 for (label_index, &label) in sorted_labels.iter().enumerate() {
257 let entity_ids = communities[&label].as_slice();
258 let member_set: std::collections::HashSet<i64> = entity_ids.iter().copied().collect();
259
260 let mut intra_facts: Vec<String> = Vec::new();
261 let mut intra_edge_ids: Vec<i64> = Vec::new();
262 for (&(src, tgt), facts) in &edge_facts_map {
263 if member_set.contains(&src) && member_set.contains(&tgt) {
264 intra_facts.extend(facts.iter().map(|f| scrub_content(f)));
265 if let Some(ids) = edge_id_map.get(&(src, tgt)) {
266 intra_edge_ids.extend_from_slice(ids);
267 }
268 }
269 }
270
271 let fingerprint = compute_partition_fingerprint(entity_ids, &intra_edge_ids);
272 new_fingerprints.insert(fingerprint.clone());
273
274 if stored_fingerprints.contains_key(&fingerprint) {
275 unchanged_count += 1;
277 continue;
278 }
279
280 let entity_names: Vec<String> = entity_ids
281 .iter()
282 .filter_map(|id| entity_name_map.get(id).map(|&s| scrub_content(s)))
283 .collect();
284
285 let base_name = entity_names
288 .iter()
289 .take(3)
290 .cloned()
291 .collect::<Vec<_>>()
292 .join(", ");
293 let name = format!("{base_name} [{label_index}]");
294
295 to_summarize.push(CommunityData {
296 entity_ids: entity_ids.to_vec(),
297 entity_names,
298 intra_facts,
299 fingerprint,
300 name,
301 });
302 }
303
304 tracing::debug!(
305 total = sorted_labels.len(),
306 unchanged = unchanged_count,
307 to_summarize = to_summarize.len(),
308 "community detection: partition classification complete"
309 );
310
311 for (stored_fp, community_id) in &stored_fingerprints {
314 if !new_fingerprints.contains(stored_fp.as_str()) {
315 store.delete_community_by_id(*community_id).await?;
316 }
317 }
318
319 let semaphore = Arc::new(Semaphore::new(concurrency.max(1)));
321 let mut join_set: JoinSet<(String, String, Vec<i64>, String)> = JoinSet::new();
322
323 for data in to_summarize {
324 let provider = provider.clone();
325 let sem = Arc::clone(&semaphore);
326 let max_bytes = community_summary_max_prompt_bytes;
327 join_set.spawn(async move {
328 let _permit = sem.acquire().await.expect("semaphore is never closed");
329 let summary = match generate_community_summary(
330 &provider,
331 &data.entity_names,
332 &data.intra_facts,
333 max_bytes,
334 )
335 .await
336 {
337 Ok(text) => text,
338 Err(e) => {
339 tracing::warn!(
340 community = %data.name,
341 "community summary generation failed: {e:#}"
342 );
343 String::new()
344 }
345 };
346 (data.name, summary, data.entity_ids, data.fingerprint)
347 });
348 }
349
350 let mut results: Vec<(String, String, Vec<i64>, String)> = Vec::new();
352 while let Some(outcome) = join_set.join_next().await {
353 match outcome {
354 Ok(tuple) => results.push(tuple),
355 Err(e) => {
356 tracing::error!(
357 panicked = e.is_panic(),
358 cancelled = e.is_cancelled(),
359 "community summary task failed"
360 );
361 }
362 }
363 }
364
365 results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
367
368 let mut count = unchanged_count;
370 for (name, summary, entity_ids, fingerprint) in results {
371 store
372 .upsert_community(&name, &summary, &entity_ids, Some(&fingerprint))
373 .await?;
374 count += 1;
375 }
376
377 Ok(count)
378}
379
380pub async fn assign_to_community(
391 store: &GraphStore,
392 entity_id: i64,
393) -> Result<Option<i64>, MemoryError> {
394 let edges = store.edges_for_entity(entity_id).await?;
395 if edges.is_empty() {
396 return Ok(None);
397 }
398
399 let neighbor_ids: Vec<i64> = edges
400 .iter()
401 .map(|e| {
402 if e.source_entity_id == entity_id {
403 e.target_entity_id
404 } else {
405 e.source_entity_id
406 }
407 })
408 .collect();
409
410 let mut community_votes: HashMap<i64, usize> = HashMap::new();
411 for &nbr_id in &neighbor_ids {
412 if let Some(community) = store.community_for_entity(nbr_id).await? {
413 *community_votes.entry(community.id).or_insert(0) += 1;
414 }
415 }
416
417 if community_votes.is_empty() {
418 return Ok(None);
419 }
420
421 let Some((&best_community_id, _)) =
424 community_votes
425 .iter()
426 .max_by(|&(&id_a, &count_a), &(&id_b, &count_b)| {
427 count_a.cmp(&count_b).then(id_b.cmp(&id_a))
428 })
429 else {
430 return Ok(None);
431 };
432
433 if let Some(mut target) = store.find_community_by_id(best_community_id).await? {
434 if !target.entity_ids.contains(&entity_id) {
435 target.entity_ids.push(entity_id);
436 store
437 .upsert_community(&target.name, &target.summary, &target.entity_ids, None)
438 .await?;
439 store.clear_community_fingerprint(best_community_id).await?;
441 }
442 return Ok(Some(best_community_id));
443 }
444
445 Ok(None)
446}
447
448pub async fn cleanup_stale_entity_embeddings(
456 _store: &GraphStore,
457 _embeddings: &crate::embedding_store::EmbeddingStore,
458) -> Result<usize, MemoryError> {
459 Ok(0)
464}
465
466pub async fn run_graph_eviction(
472 store: &GraphStore,
473 expired_edge_retention_days: u32,
474 max_entities: usize,
475) -> Result<GraphEvictionStats, MemoryError> {
476 let expired_edges_deleted = store
477 .delete_expired_edges(expired_edge_retention_days)
478 .await?;
479 let orphan_entities_deleted = store
480 .delete_orphan_entities(expired_edge_retention_days)
481 .await?;
482 let capped_entities_deleted = if max_entities > 0 {
483 store.cap_entities(max_entities).await?
484 } else {
485 0
486 };
487
488 Ok(GraphEvictionStats {
489 expired_edges_deleted,
490 orphan_entities_deleted,
491 capped_entities_deleted,
492 })
493}
494
495async fn generate_community_summary(
496 provider: &AnyProvider,
497 entity_names: &[String],
498 edge_facts: &[String],
499 max_prompt_bytes: usize,
500) -> Result<String, MemoryError> {
501 let entities_str = entity_names.join(", ");
502 let facts_str = edge_facts
504 .iter()
505 .take(20)
506 .map(|f| format!("- {f}"))
507 .collect::<Vec<_>>()
508 .join("\n");
509
510 let raw_prompt = format!(
511 "Summarize the following group of related entities and their relationships \
512 into a single paragraph (2-3 sentences). Focus on the theme that connects \
513 them and the key relationships.\n\nEntities: {entities_str}\n\
514 Relationships:\n{facts_str}\n\nSummary:"
515 );
516
517 let original_bytes = raw_prompt.len();
518 let truncated = raw_prompt.len() > max_prompt_bytes;
519 let prompt = truncate_prompt(raw_prompt, max_prompt_bytes);
520 if prompt.is_empty() {
521 return Ok(String::new());
522 }
523 if truncated {
524 tracing::warn!(
525 entity_count = entity_names.len(),
526 original_bytes,
527 truncated_bytes = prompt.len(),
528 "community summary prompt truncated"
529 );
530 }
531
532 let messages = [Message::from_legacy(Role::User, prompt)];
533 let response: String = provider.chat(&messages).await.map_err(MemoryError::Llm)?;
534 Ok(response)
535}
536
537#[cfg(test)]
538mod tests {
539 use std::sync::{Arc, Mutex};
540
541 use super::*;
542 use crate::graph::types::EntityType;
543 use crate::sqlite::SqliteStore;
544
545 async fn setup() -> GraphStore {
546 let store = SqliteStore::new(":memory:").await.unwrap();
547 GraphStore::new(store.pool().clone())
548 }
549
550 fn mock_provider() -> AnyProvider {
551 AnyProvider::Mock(zeph_llm::mock::MockProvider::default())
552 }
553
554 fn recording_provider() -> (
555 AnyProvider,
556 Arc<Mutex<Vec<Vec<zeph_llm::provider::Message>>>>,
557 ) {
558 let (mock, buf) = zeph_llm::mock::MockProvider::default().with_recording();
559 (AnyProvider::Mock(mock), buf)
560 }
561
562 #[tokio::test]
563 async fn test_detect_communities_empty_graph() {
564 let store = setup().await;
565 let provider = mock_provider();
566 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
567 .await
568 .unwrap();
569 assert_eq!(count, 0);
570 }
571
572 #[tokio::test]
573 async fn test_detect_communities_single_entity() {
574 let store = setup().await;
575 let provider = mock_provider();
576 store
577 .upsert_entity("Solo", "Solo", EntityType::Concept, None)
578 .await
579 .unwrap();
580 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
581 .await
582 .unwrap();
583 assert_eq!(count, 0, "single isolated entity must not form a community");
584 }
585
586 #[tokio::test]
587 async fn test_single_entity_community_filtered() {
588 let store = setup().await;
589 let provider = mock_provider();
590
591 let a = store
593 .upsert_entity("A", "A", EntityType::Concept, None)
594 .await
595 .unwrap();
596 let b = store
597 .upsert_entity("B", "B", EntityType::Concept, None)
598 .await
599 .unwrap();
600 let c = store
601 .upsert_entity("C", "C", EntityType::Concept, None)
602 .await
603 .unwrap();
604 let iso = store
605 .upsert_entity("Isolated", "Isolated", EntityType::Concept, None)
606 .await
607 .unwrap();
608
609 store
610 .insert_edge(a, b, "r", "A relates B", 1.0, None)
611 .await
612 .unwrap();
613 store
614 .insert_edge(b, c, "r", "B relates C", 1.0, None)
615 .await
616 .unwrap();
617
618 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
619 .await
620 .unwrap();
621 assert_eq!(count, 1, "only the 3-entity cluster should be detected");
623
624 let communities = store.all_communities().await.unwrap();
625 assert_eq!(communities.len(), 1);
626 assert!(
627 !communities[0].entity_ids.contains(&iso),
628 "isolated entity must not be in any community"
629 );
630 }
631
632 #[tokio::test]
633 async fn test_label_propagation_basic() {
634 let store = setup().await;
635 let provider = mock_provider();
636
637 let mut cluster_ids: Vec<Vec<i64>> = Vec::new();
639 for cluster in 0..4_i64 {
640 let mut ids = Vec::new();
641 for node in 0..3_i64 {
642 let name = format!("c{cluster}_n{node}");
643 let id = store
644 .upsert_entity(&name, &name, EntityType::Concept, None)
645 .await
646 .unwrap();
647 ids.push(id);
648 }
649 store
651 .insert_edge(ids[0], ids[1], "r", "f", 1.0, None)
652 .await
653 .unwrap();
654 store
655 .insert_edge(ids[1], ids[2], "r", "f", 1.0, None)
656 .await
657 .unwrap();
658 cluster_ids.push(ids);
659 }
660
661 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
662 .await
663 .unwrap();
664 assert_eq!(count, 4, "expected 4 communities, one per cluster");
665
666 let communities = store.all_communities().await.unwrap();
667 assert_eq!(communities.len(), 4);
668
669 for ids in &cluster_ids {
671 let found = communities
672 .iter()
673 .filter(|c| ids.iter().any(|id| c.entity_ids.contains(id)))
674 .count();
675 assert_eq!(
676 found, 1,
677 "all nodes of a cluster must be in the same community"
678 );
679 }
680 }
681
682 #[tokio::test]
683 async fn test_all_isolated_nodes() {
684 let store = setup().await;
685 let provider = mock_provider();
686
687 for i in 0..5_i64 {
689 store
690 .upsert_entity(
691 &format!("iso_{i}"),
692 &format!("iso_{i}"),
693 EntityType::Concept,
694 None,
695 )
696 .await
697 .unwrap();
698 }
699
700 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
701 .await
702 .unwrap();
703 assert_eq!(count, 0, "zero-edge graph must produce no communities");
704 assert_eq!(store.community_count().await.unwrap(), 0);
705 }
706
707 #[tokio::test]
708 async fn test_eviction_expired_edges() {
709 let store = setup().await;
710
711 let a = store
712 .upsert_entity("EA", "EA", EntityType::Concept, None)
713 .await
714 .unwrap();
715 let b = store
716 .upsert_entity("EB", "EB", EntityType::Concept, None)
717 .await
718 .unwrap();
719 let edge_id = store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
720 store.invalidate_edge(edge_id).await.unwrap();
721
722 sqlx::query(
724 "UPDATE graph_edges SET expired_at = datetime('now', '-200 days') WHERE id = ?1",
725 )
726 .bind(edge_id)
727 .execute(store.pool())
728 .await
729 .unwrap();
730
731 let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
732 assert_eq!(stats.expired_edges_deleted, 1);
733 }
734
735 #[tokio::test]
736 async fn test_eviction_orphan_entities() {
737 let store = setup().await;
738
739 let iso = store
740 .upsert_entity("Orphan", "Orphan", EntityType::Concept, None)
741 .await
742 .unwrap();
743
744 sqlx::query(
746 "UPDATE graph_entities SET last_seen_at = datetime('now', '-200 days') WHERE id = ?1",
747 )
748 .bind(iso)
749 .execute(store.pool())
750 .await
751 .unwrap();
752
753 let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
754 assert_eq!(stats.orphan_entities_deleted, 1);
755 }
756
757 #[tokio::test]
758 async fn test_eviction_entity_cap() {
759 let store = setup().await;
760
761 for i in 0..5_i64 {
763 let name = format!("cap_entity_{i}");
764 store
765 .upsert_entity(&name, &name, EntityType::Concept, None)
766 .await
767 .unwrap();
768 }
769
770 let stats = run_graph_eviction(&store, 90, 3).await.unwrap();
771 assert_eq!(
772 stats.capped_entities_deleted, 2,
773 "should delete 5-3=2 entities"
774 );
775 assert_eq!(store.entity_count().await.unwrap(), 3);
776 }
777
778 #[tokio::test]
779 async fn test_assign_to_community_no_neighbors() {
780 let store = setup().await;
781 let entity_id = store
782 .upsert_entity("Loner", "Loner", EntityType::Concept, None)
783 .await
784 .unwrap();
785
786 let result = assign_to_community(&store, entity_id).await.unwrap();
787 assert!(result.is_none());
788 }
789
790 #[tokio::test]
791 async fn test_extraction_count_persistence() {
792 use tempfile::NamedTempFile;
793 let tmp = NamedTempFile::new().unwrap();
795 let path = tmp.path().to_str().unwrap().to_owned();
796
797 let store1 = {
798 let s = crate::sqlite::SqliteStore::new(&path).await.unwrap();
799 GraphStore::new(s.pool().clone())
800 };
801
802 store1.set_metadata("extraction_count", "0").await.unwrap();
803 for i in 1..=5_i64 {
804 store1
805 .set_metadata("extraction_count", &i.to_string())
806 .await
807 .unwrap();
808 }
809
810 let store2 = {
812 let s = crate::sqlite::SqliteStore::new(&path).await.unwrap();
813 GraphStore::new(s.pool().clone())
814 };
815 assert_eq!(store2.extraction_count().await.unwrap(), 5);
816 }
817
818 #[test]
819 fn test_scrub_content_ascii_control() {
820 let input = "hello\nworld\r\x00\x01\x09end";
822 assert_eq!(scrub_content(input), "helloworldend");
823 }
824
825 #[test]
826 fn test_scrub_content_bidi_overrides() {
827 let input = "safe\u{202A}inject\u{202E}end\u{2066}iso\u{2069}done".to_string();
830 assert_eq!(scrub_content(&input), "safeinjectendisodone");
831 }
832
833 #[test]
834 fn test_scrub_content_zero_width() {
835 let input = "a\u{200B}b\u{200C}c\u{200D}d\u{200F}e".to_string();
838 assert_eq!(scrub_content(&input), "abcde");
839 }
840
841 #[test]
842 fn test_scrub_content_bom() {
843 let input = "\u{FEFF}hello".to_string();
845 assert_eq!(scrub_content(&input), "hello");
846 }
847
848 #[test]
849 fn test_scrub_content_clean_string_unchanged() {
850 let input = "Hello, World! 123 — normal text.";
851 assert_eq!(scrub_content(input), input);
852 }
853
854 #[test]
855 fn test_truncate_prompt_within_limit() {
856 let result = truncate_prompt("short".into(), 100);
857 assert_eq!(result, "short");
858 }
859
860 #[test]
861 fn test_truncate_prompt_zero_max_bytes() {
862 let result = truncate_prompt("hello".into(), 0);
863 assert_eq!(result, "");
864 }
865
866 #[test]
867 fn test_truncate_prompt_long_facts() {
868 let facts: Vec<String> = (0..20)
869 .map(|i| format!("fact_{i}_{}", "x".repeat(20)))
870 .collect();
871 let prompt = facts.join("\n");
872 let result = truncate_prompt(prompt, 200);
873 assert!(
874 result.ends_with("..."),
875 "truncated prompt must end with '...'"
876 );
877 assert!(result.len() <= 203);
879 assert!(std::str::from_utf8(result.as_bytes()).is_ok());
880 }
881
882 #[test]
883 fn test_truncate_prompt_utf8_boundary() {
884 let prompt = "🔥".repeat(100);
886 let result = truncate_prompt(prompt, 10);
887 assert!(
888 result.ends_with("..."),
889 "truncated prompt must end with '...'"
890 );
891 assert_eq!(result.len(), 8 + 3, "2 emojis (8 bytes) + '...' (3 bytes)");
893 assert!(std::str::from_utf8(result.as_bytes()).is_ok());
894 }
895
896 #[tokio::test]
897 async fn test_assign_to_community_majority_vote() {
898 let store = setup().await;
899
900 let a = store
902 .upsert_entity("AA", "AA", EntityType::Concept, None)
903 .await
904 .unwrap();
905 let b = store
906 .upsert_entity("BB", "BB", EntityType::Concept, None)
907 .await
908 .unwrap();
909 let d = store
910 .upsert_entity("DD", "DD", EntityType::Concept, None)
911 .await
912 .unwrap();
913
914 store
915 .upsert_community("test_community", "summary", &[a, b], None)
916 .await
917 .unwrap();
918
919 store.insert_edge(d, a, "r", "f", 1.0, None).await.unwrap();
920 store.insert_edge(d, b, "r", "f", 1.0, None).await.unwrap();
921
922 let result = assign_to_community(&store, d).await.unwrap();
923 assert!(result.is_some());
924
925 let returned_id = result.unwrap();
927 let community = store
928 .find_community_by_id(returned_id)
929 .await
930 .unwrap()
931 .expect("returned community_id must reference an existing row");
932 assert!(
933 community.entity_ids.contains(&d),
934 "D should be added to the community"
935 );
936 assert!(
938 community.fingerprint.is_none(),
939 "fingerprint must be cleared after assign_to_community"
940 );
941 }
942
943 #[tokio::test]
945 async fn test_incremental_detection_no_changes_skips_llm() {
946 let store = setup().await;
947 let (provider, call_buf) = recording_provider();
948
949 let a = store
950 .upsert_entity("X", "X", EntityType::Concept, None)
951 .await
952 .unwrap();
953 let b = store
954 .upsert_entity("Y", "Y", EntityType::Concept, None)
955 .await
956 .unwrap();
957 store
958 .insert_edge(a, b, "r", "X relates Y", 1.0, None)
959 .await
960 .unwrap();
961
962 detect_communities(&store, &provider, usize::MAX, 4, 0)
964 .await
965 .unwrap();
966 let first_calls = call_buf.lock().unwrap().len();
967 assert_eq!(first_calls, 1, "first run must produce exactly 1 LLM call");
968
969 detect_communities(&store, &provider, usize::MAX, 4, 0)
971 .await
972 .unwrap();
973 let second_calls = call_buf.lock().unwrap().len();
974 assert_eq!(
975 second_calls, first_calls,
976 "second run with no graph changes must produce 0 additional LLM calls"
977 );
978 }
979
980 #[tokio::test]
982 async fn test_incremental_detection_edge_change_triggers_resummary() {
983 let store = setup().await;
984 let (provider, call_buf) = recording_provider();
985
986 let a = store
987 .upsert_entity("P", "P", EntityType::Concept, None)
988 .await
989 .unwrap();
990 let b = store
991 .upsert_entity("Q", "Q", EntityType::Concept, None)
992 .await
993 .unwrap();
994 store
995 .insert_edge(a, b, "r", "P relates Q", 1.0, None)
996 .await
997 .unwrap();
998
999 detect_communities(&store, &provider, usize::MAX, 4, 0)
1000 .await
1001 .unwrap();
1002 let after_first = call_buf.lock().unwrap().len();
1003 assert_eq!(after_first, 1);
1004
1005 store
1007 .insert_edge(b, a, "r2", "Q also relates P", 1.0, None)
1008 .await
1009 .unwrap();
1010
1011 detect_communities(&store, &provider, usize::MAX, 4, 0)
1012 .await
1013 .unwrap();
1014 let after_second = call_buf.lock().unwrap().len();
1015 assert_eq!(
1016 after_second, 2,
1017 "edge change must trigger one additional LLM call"
1018 );
1019 }
1020
1021 #[tokio::test]
1023 async fn test_incremental_detection_dissolved_community_deleted() {
1024 let store = setup().await;
1025 let provider = mock_provider();
1026
1027 let a = store
1028 .upsert_entity("M1", "M1", EntityType::Concept, None)
1029 .await
1030 .unwrap();
1031 let b = store
1032 .upsert_entity("M2", "M2", EntityType::Concept, None)
1033 .await
1034 .unwrap();
1035 let edge_id = store
1036 .insert_edge(a, b, "r", "M1 relates M2", 1.0, None)
1037 .await
1038 .unwrap();
1039
1040 detect_communities(&store, &provider, usize::MAX, 4, 0)
1041 .await
1042 .unwrap();
1043 assert_eq!(store.community_count().await.unwrap(), 1);
1044
1045 store.invalidate_edge(edge_id).await.unwrap();
1047
1048 detect_communities(&store, &provider, usize::MAX, 4, 0)
1049 .await
1050 .unwrap();
1051 assert_eq!(
1052 store.community_count().await.unwrap(),
1053 0,
1054 "dissolved community must be deleted on next refresh"
1055 );
1056 }
1057
1058 #[tokio::test]
1060 async fn test_detect_communities_concurrency_one() {
1061 let store = setup().await;
1062 let provider = mock_provider();
1063
1064 let a = store
1065 .upsert_entity("C1A", "C1A", EntityType::Concept, None)
1066 .await
1067 .unwrap();
1068 let b = store
1069 .upsert_entity("C1B", "C1B", EntityType::Concept, None)
1070 .await
1071 .unwrap();
1072 store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
1073
1074 let count = detect_communities(&store, &provider, usize::MAX, 1, 0)
1075 .await
1076 .unwrap();
1077 assert_eq!(count, 1, "concurrency=1 must still detect the community");
1078 assert_eq!(store.community_count().await.unwrap(), 1);
1079 }
1080
1081 #[test]
1082 fn test_compute_fingerprint_deterministic() {
1083 let fp1 = compute_partition_fingerprint(&[1, 2, 3], &[10, 20]);
1084 let fp2 = compute_partition_fingerprint(&[3, 1, 2], &[20, 10]);
1085 assert_eq!(fp1, fp2, "fingerprint must be order-independent");
1086
1087 let fp3 = compute_partition_fingerprint(&[1, 2, 3], &[10, 30]);
1088 assert_ne!(
1089 fp1, fp3,
1090 "different edge IDs must produce different fingerprint"
1091 );
1092
1093 let fp4 = compute_partition_fingerprint(&[1, 2, 4], &[10, 20]);
1094 assert_ne!(
1095 fp1, fp4,
1096 "different entity IDs must produce different fingerprint"
1097 );
1098 }
1099
1100 #[test]
1105 fn test_compute_fingerprint_domain_separation() {
1106 let fp_a = compute_partition_fingerprint(&[1, 2], &[3]);
1107 let fp_b = compute_partition_fingerprint(&[1], &[2, 3]);
1108 assert_ne!(
1109 fp_a, fp_b,
1110 "entity/edge sequences with same raw bytes must produce different fingerprints"
1111 );
1112 }
1113
1114 #[tokio::test]
1120 async fn test_detect_communities_chunked_correct_membership() {
1121 let store = setup().await;
1122 let provider = mock_provider();
1123
1124 let a = store
1126 .upsert_entity("CA", "CA", EntityType::Concept, None)
1127 .await
1128 .unwrap();
1129 let b = store
1130 .upsert_entity("CB", "CB", EntityType::Concept, None)
1131 .await
1132 .unwrap();
1133 let c = store
1134 .upsert_entity("CC", "CC", EntityType::Concept, None)
1135 .await
1136 .unwrap();
1137 let d = store
1138 .upsert_entity("CD", "CD", EntityType::Concept, None)
1139 .await
1140 .unwrap();
1141 let e = store
1142 .upsert_entity("CE", "CE", EntityType::Concept, None)
1143 .await
1144 .unwrap();
1145
1146 store
1147 .insert_edge(a, b, "r", "A-B fact", 1.0, None)
1148 .await
1149 .unwrap();
1150 store
1151 .insert_edge(b, c, "r", "B-C fact", 1.0, None)
1152 .await
1153 .unwrap();
1154 store
1155 .insert_edge(d, e, "r", "D-E fact", 1.0, None)
1156 .await
1157 .unwrap();
1158
1159 let count_chunked = detect_communities(&store, &provider, usize::MAX, 4, 1)
1161 .await
1162 .unwrap();
1163 assert_eq!(
1164 count_chunked, 2,
1165 "chunked loading must detect both communities"
1166 );
1167
1168 let communities = store.all_communities().await.unwrap();
1170 assert_eq!(communities.len(), 2);
1171
1172 let abc_ids = [a, b, c];
1173 let de_ids = [d, e];
1174 let has_abc = communities
1175 .iter()
1176 .any(|c| abc_ids.iter().all(|id| c.entity_ids.contains(id)));
1177 let has_de = communities
1178 .iter()
1179 .any(|c| de_ids.iter().all(|id| c.entity_ids.contains(id)));
1180 assert!(has_abc, "cluster A-B-C must form a community");
1181 assert!(has_de, "cluster D-E must form a community");
1182 }
1183
1184 #[tokio::test]
1186 async fn test_detect_communities_chunk_size_max() {
1187 let store = setup().await;
1188 let provider = mock_provider();
1189
1190 let x = store
1191 .upsert_entity("MX", "MX", EntityType::Concept, None)
1192 .await
1193 .unwrap();
1194 let y = store
1195 .upsert_entity("MY", "MY", EntityType::Concept, None)
1196 .await
1197 .unwrap();
1198 store
1199 .insert_edge(x, y, "r", "X-Y fact", 1.0, None)
1200 .await
1201 .unwrap();
1202
1203 let count = detect_communities(&store, &provider, usize::MAX, 4, usize::MAX)
1204 .await
1205 .unwrap();
1206 assert_eq!(count, 1, "chunk_size=usize::MAX must detect the community");
1207 }
1208
1209 #[tokio::test]
1211 async fn test_detect_communities_chunk_size_zero_fallback() {
1212 let store = setup().await;
1213 let provider = mock_provider();
1214
1215 let p = store
1216 .upsert_entity("ZP", "ZP", EntityType::Concept, None)
1217 .await
1218 .unwrap();
1219 let q = store
1220 .upsert_entity("ZQ", "ZQ", EntityType::Concept, None)
1221 .await
1222 .unwrap();
1223 store
1224 .insert_edge(p, q, "r", "P-Q fact", 1.0, None)
1225 .await
1226 .unwrap();
1227
1228 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
1229 .await
1230 .unwrap();
1231 assert_eq!(
1232 count, 1,
1233 "chunk_size=0 must detect the community via stream fallback"
1234 );
1235 }
1236
1237 #[tokio::test]
1241 async fn test_detect_communities_chunked_edge_map_complete() {
1242 let store = setup().await;
1243 let (provider, call_buf) = recording_provider();
1244
1245 let a = store
1246 .upsert_entity("FA", "FA", EntityType::Concept, None)
1247 .await
1248 .unwrap();
1249 let b = store
1250 .upsert_entity("FB", "FB", EntityType::Concept, None)
1251 .await
1252 .unwrap();
1253 store
1254 .insert_edge(a, b, "r", "edge1 fact", 1.0, None)
1255 .await
1256 .unwrap();
1257
1258 detect_communities(&store, &provider, usize::MAX, 4, 1)
1260 .await
1261 .unwrap();
1262 let calls_after_first = call_buf.lock().unwrap().len();
1263 assert_eq!(calls_after_first, 1, "first run must trigger 1 LLM call");
1264
1265 store
1267 .insert_edge(b, a, "r2", "edge2 fact", 1.0, None)
1268 .await
1269 .unwrap();
1270
1271 detect_communities(&store, &provider, usize::MAX, 4, 1)
1272 .await
1273 .unwrap();
1274 let calls_after_second = call_buf.lock().unwrap().len();
1275 assert_eq!(
1276 calls_after_second, 2,
1277 "adding an edge must change fingerprint and trigger re-summarization"
1278 );
1279 }
1280}