1use std::collections::HashMap;
5use std::sync::Arc;
6
7use futures::TryStreamExt as _;
8use petgraph::Graph;
9use petgraph::graph::NodeIndex;
10use tokio::sync::Semaphore;
11use tokio::task::JoinSet;
12use zeph_llm::LlmProvider as _;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{Message, Role};
15
16use crate::error::MemoryError;
17
18use super::store::GraphStore;
19use super::types::Entity;
20
21const MAX_LABEL_PROPAGATION_ITERATIONS: usize = 50;
22
23fn scrub_content(s: &str) -> String {
32 s.chars()
33 .filter(|c| {
34 !c.is_control()
35 && !matches!(*c as u32,
36 0x200B..=0x200F | 0x202A..=0x202E | 0x2066..=0x2069 | 0xFEFF
37 )
38 })
39 .collect()
40}
41
42#[derive(Debug, Default)]
44pub struct GraphEvictionStats {
45 pub expired_edges_deleted: usize,
46 pub orphan_entities_deleted: usize,
47 pub capped_entities_deleted: usize,
48}
49
50fn truncate_prompt(prompt: String, max_bytes: usize) -> String {
56 if max_bytes == 0 {
57 return String::new();
58 }
59 if prompt.len() <= max_bytes {
60 return prompt;
61 }
62 let boundary = prompt.floor_char_boundary(max_bytes);
63 format!("{}...", &prompt[..boundary])
64}
65
66fn compute_partition_fingerprint(entity_ids: &[i64], intra_edge_ids: &[i64]) -> String {
72 let mut hasher = blake3::Hasher::new();
73 let mut sorted_entities = entity_ids.to_vec();
74 sorted_entities.sort_unstable();
75 hasher.update(b"entities");
76 for id in &sorted_entities {
77 hasher.update(&id.to_le_bytes());
78 }
79 let mut sorted_edges = intra_edge_ids.to_vec();
80 sorted_edges.sort_unstable();
81 hasher.update(b"edges");
82 for id in &sorted_edges {
83 hasher.update(&id.to_le_bytes());
84 }
85 hasher.finalize().to_hex().to_string()
86}
87
88struct CommunityData {
90 entity_ids: Vec<i64>,
91 entity_names: Vec<String>,
92 intra_facts: Vec<String>,
93 fingerprint: String,
94 name: String,
95}
96
97type UndirectedGraph = Graph<i64, (), petgraph::Undirected>;
98
99async fn build_entity_graph_and_maps(
100 store: &GraphStore,
101 entities: &[Entity],
102 edge_chunk_size: usize,
103) -> Result<
104 (
105 UndirectedGraph,
106 HashMap<(i64, i64), Vec<String>>,
107 HashMap<(i64, i64), Vec<i64>>,
108 ),
109 MemoryError,
110> {
111 let mut graph = UndirectedGraph::new_undirected();
112 let mut node_map: HashMap<i64, NodeIndex> = HashMap::new();
113
114 for entity in entities {
115 let idx = graph.add_node(entity.id);
116 node_map.insert(entity.id, idx);
117 }
118
119 let mut edge_facts_map: HashMap<(i64, i64), Vec<String>> = HashMap::new();
120 let mut edge_id_map: HashMap<(i64, i64), Vec<i64>> = HashMap::new();
121
122 if edge_chunk_size == 0 {
123 let edges: Vec<_> = store.all_active_edges_stream().try_collect().await?;
124 for edge in &edges {
125 if let (Some(&src_idx), Some(&tgt_idx)) = (
126 node_map.get(&edge.source_entity_id),
127 node_map.get(&edge.target_entity_id),
128 ) {
129 graph.add_edge(src_idx, tgt_idx, ());
130 }
131 let key = (edge.source_entity_id, edge.target_entity_id);
132 edge_facts_map
133 .entry(key)
134 .or_default()
135 .push(edge.fact.clone());
136 edge_id_map.entry(key).or_default().push(edge.id);
137 }
138 } else {
139 let limit = i64::try_from(edge_chunk_size).unwrap_or(i64::MAX);
140 let mut last_id: i64 = 0;
141 loop {
142 let chunk = store.edges_after_id(last_id, limit).await?;
143 if chunk.is_empty() {
144 break;
145 }
146 last_id = chunk.last().expect("non-empty chunk has a last element").id;
147 for edge in &chunk {
148 if let (Some(&src_idx), Some(&tgt_idx)) = (
149 node_map.get(&edge.source_entity_id),
150 node_map.get(&edge.target_entity_id),
151 ) {
152 graph.add_edge(src_idx, tgt_idx, ());
153 }
154 let key = (edge.source_entity_id, edge.target_entity_id);
155 edge_facts_map
156 .entry(key)
157 .or_default()
158 .push(edge.fact.clone());
159 edge_id_map.entry(key).or_default().push(edge.id);
160 }
161 }
162 }
163
164 Ok((graph, edge_facts_map, edge_id_map))
165}
166
167fn run_label_propagation(graph: &UndirectedGraph) -> HashMap<usize, Vec<i64>> {
168 let mut labels: Vec<usize> = (0..graph.node_count()).collect();
169
170 for _ in 0..MAX_LABEL_PROPAGATION_ITERATIONS {
171 let mut changed = false;
172 for node_idx in graph.node_indices() {
173 let neighbors: Vec<NodeIndex> = graph.neighbors(node_idx).collect();
174 if neighbors.is_empty() {
175 continue;
176 }
177 let mut freq: HashMap<usize, usize> = HashMap::new();
178 for &nbr in &neighbors {
179 *freq.entry(labels[nbr.index()]).or_insert(0) += 1;
180 }
181 let max_count = *freq.values().max().unwrap_or(&0);
182 let best_label = freq
183 .iter()
184 .filter(|&(_, count)| *count == max_count)
185 .map(|(&label, _)| label)
186 .min()
187 .unwrap_or(labels[node_idx.index()]);
188 if labels[node_idx.index()] != best_label {
189 labels[node_idx.index()] = best_label;
190 changed = true;
191 }
192 }
193 if !changed {
194 break;
195 }
196 }
197
198 let mut communities: HashMap<usize, Vec<i64>> = HashMap::new();
199 for node_idx in graph.node_indices() {
200 let entity_id = graph[node_idx];
201 communities
202 .entry(labels[node_idx.index()])
203 .or_default()
204 .push(entity_id);
205 }
206 communities.retain(|_, members| members.len() >= 2);
207 communities
208}
209
210struct ClassifyResult {
211 to_summarize: Vec<CommunityData>,
212 unchanged_count: usize,
213 new_fingerprints: std::collections::HashSet<String>,
214}
215
216fn classify_communities(
217 communities: &HashMap<usize, Vec<i64>>,
218 edge_facts_map: &HashMap<(i64, i64), Vec<String>>,
219 edge_id_map: &HashMap<(i64, i64), Vec<i64>>,
220 entity_name_map: &HashMap<i64, &str>,
221 stored_fingerprints: &HashMap<String, i64>,
222 sorted_labels: &[usize],
223) -> ClassifyResult {
224 let mut to_summarize: Vec<CommunityData> = Vec::new();
225 let mut unchanged_count = 0usize;
226 let mut new_fingerprints: std::collections::HashSet<String> = std::collections::HashSet::new();
227
228 for (label_index, &label) in sorted_labels.iter().enumerate() {
229 let entity_ids = communities[&label].as_slice();
230 let member_set: std::collections::HashSet<i64> = entity_ids.iter().copied().collect();
231
232 let mut intra_facts: Vec<String> = Vec::new();
233 let mut intra_edge_ids: Vec<i64> = Vec::new();
234 for (&(src, tgt), facts) in edge_facts_map {
235 if member_set.contains(&src) && member_set.contains(&tgt) {
236 intra_facts.extend(facts.iter().map(|f| scrub_content(f)));
237 if let Some(ids) = edge_id_map.get(&(src, tgt)) {
238 intra_edge_ids.extend_from_slice(ids);
239 }
240 }
241 }
242
243 let fingerprint = compute_partition_fingerprint(entity_ids, &intra_edge_ids);
244 new_fingerprints.insert(fingerprint.clone());
245
246 if stored_fingerprints.contains_key(&fingerprint) {
247 unchanged_count += 1;
248 continue;
249 }
250
251 let entity_names: Vec<String> = entity_ids
252 .iter()
253 .filter_map(|id| entity_name_map.get(id).map(|&s| scrub_content(s)))
254 .collect();
255
256 let base_name = entity_names
259 .iter()
260 .take(3)
261 .cloned()
262 .collect::<Vec<_>>()
263 .join(", ");
264 let name = format!("{base_name} [{label_index}]");
265
266 to_summarize.push(CommunityData {
267 entity_ids: entity_ids.to_vec(),
268 entity_names,
269 intra_facts,
270 fingerprint,
271 name,
272 });
273 }
274
275 ClassifyResult {
276 to_summarize,
277 unchanged_count,
278 new_fingerprints,
279 }
280}
281
282async fn summarize_and_upsert_communities(
283 store: &GraphStore,
284 provider: &AnyProvider,
285 to_summarize: Vec<CommunityData>,
286 concurrency: usize,
287 community_summary_max_prompt_bytes: usize,
288) -> Result<usize, MemoryError> {
289 let semaphore = Arc::new(Semaphore::new(concurrency.max(1)));
290 let mut join_set: JoinSet<(String, String, Vec<i64>, String)> = JoinSet::new();
291
292 for data in to_summarize {
293 let provider = provider.clone();
294 let sem = Arc::clone(&semaphore);
295 let max_bytes = community_summary_max_prompt_bytes;
296 join_set.spawn(async move {
297 let _permit = sem.acquire().await.expect("semaphore is never closed");
298 let summary = match generate_community_summary(
299 &provider,
300 &data.entity_names,
301 &data.intra_facts,
302 max_bytes,
303 )
304 .await
305 {
306 Ok(text) => text,
307 Err(e) => {
308 tracing::warn!(community = %data.name, "community summary generation failed: {e:#}");
309 String::new()
310 }
311 };
312 (data.name, summary, data.entity_ids, data.fingerprint)
313 });
314 }
315
316 let mut results: Vec<(String, String, Vec<i64>, String)> = Vec::new();
318 while let Some(outcome) = join_set.join_next().await {
319 match outcome {
320 Ok(tuple) => results.push(tuple),
321 Err(e) => {
322 tracing::error!(
323 panicked = e.is_panic(),
324 cancelled = e.is_cancelled(),
325 "community summary task failed"
326 );
327 }
328 }
329 }
330
331 results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
332
333 let mut count = 0usize;
334 for (name, summary, entity_ids, fingerprint) in results {
335 store
336 .upsert_community(&name, &summary, &entity_ids, Some(&fingerprint))
337 .await?;
338 count += 1;
339 }
340
341 Ok(count)
342}
343
344pub async fn detect_communities(
363 store: &GraphStore,
364 provider: &AnyProvider,
365 community_summary_max_prompt_bytes: usize,
366 concurrency: usize,
367 edge_chunk_size: usize,
368) -> Result<usize, MemoryError> {
369 let entities = store.all_entities().await?;
370 if entities.len() < 2 {
371 return Ok(0);
372 }
373
374 let (graph, edge_facts_map, edge_id_map) =
375 build_entity_graph_and_maps(store, &entities, edge_chunk_size).await?;
376
377 let communities = run_label_propagation(&graph);
378
379 let entity_name_map: HashMap<i64, &str> =
380 entities.iter().map(|e| (e.id, e.name.as_str())).collect();
381 let stored_fingerprints = store.community_fingerprints().await?;
382
383 let mut sorted_labels: Vec<usize> = communities.keys().copied().collect();
384 sorted_labels.sort_unstable();
385
386 let ClassifyResult {
387 to_summarize,
388 unchanged_count,
389 new_fingerprints,
390 } = classify_communities(
391 &communities,
392 &edge_facts_map,
393 &edge_id_map,
394 &entity_name_map,
395 &stored_fingerprints,
396 &sorted_labels,
397 );
398
399 tracing::debug!(
400 total = sorted_labels.len(),
401 unchanged = unchanged_count,
402 to_summarize = to_summarize.len(),
403 "community detection: partition classification complete"
404 );
405
406 for (stored_fp, community_id) in &stored_fingerprints {
408 if !new_fingerprints.contains(stored_fp.as_str()) {
409 store.delete_community_by_id(*community_id).await?;
410 }
411 }
412
413 let new_count = summarize_and_upsert_communities(
414 store,
415 provider,
416 to_summarize,
417 concurrency,
418 community_summary_max_prompt_bytes,
419 )
420 .await?;
421
422 Ok(unchanged_count + new_count)
423}
424
425pub async fn assign_to_community(
436 store: &GraphStore,
437 entity_id: i64,
438) -> Result<Option<i64>, MemoryError> {
439 let edges = store.edges_for_entity(entity_id).await?;
440 if edges.is_empty() {
441 return Ok(None);
442 }
443
444 let neighbor_ids: Vec<i64> = edges
445 .iter()
446 .map(|e| {
447 if e.source_entity_id == entity_id {
448 e.target_entity_id
449 } else {
450 e.source_entity_id
451 }
452 })
453 .collect();
454
455 let mut community_votes: HashMap<i64, usize> = HashMap::new();
456 for &nbr_id in &neighbor_ids {
457 if let Some(community) = store.community_for_entity(nbr_id).await? {
458 *community_votes.entry(community.id).or_insert(0) += 1;
459 }
460 }
461
462 if community_votes.is_empty() {
463 return Ok(None);
464 }
465
466 let Some((&best_community_id, _)) =
469 community_votes
470 .iter()
471 .max_by(|&(&id_a, &count_a), &(&id_b, &count_b)| {
472 count_a.cmp(&count_b).then(id_b.cmp(&id_a))
473 })
474 else {
475 return Ok(None);
476 };
477
478 if let Some(mut target) = store.find_community_by_id(best_community_id).await? {
479 if !target.entity_ids.contains(&entity_id) {
480 target.entity_ids.push(entity_id);
481 store
482 .upsert_community(&target.name, &target.summary, &target.entity_ids, None)
483 .await?;
484 store.clear_community_fingerprint(best_community_id).await?;
486 }
487 return Ok(Some(best_community_id));
488 }
489
490 Ok(None)
491}
492
493pub async fn cleanup_stale_entity_embeddings(
501 _store: &GraphStore,
502 _embeddings: &crate::embedding_store::EmbeddingStore,
503) -> Result<usize, MemoryError> {
504 Ok(0)
509}
510
511pub async fn run_graph_eviction(
517 store: &GraphStore,
518 expired_edge_retention_days: u32,
519 max_entities: usize,
520) -> Result<GraphEvictionStats, MemoryError> {
521 let expired_edges_deleted = store
522 .delete_expired_edges(expired_edge_retention_days)
523 .await?;
524 let orphan_entities_deleted = store
525 .delete_orphan_entities(expired_edge_retention_days)
526 .await?;
527 let capped_entities_deleted = if max_entities > 0 {
528 store.cap_entities(max_entities).await?
529 } else {
530 0
531 };
532
533 Ok(GraphEvictionStats {
534 expired_edges_deleted,
535 orphan_entities_deleted,
536 capped_entities_deleted,
537 })
538}
539
540async fn generate_community_summary(
541 provider: &AnyProvider,
542 entity_names: &[String],
543 edge_facts: &[String],
544 max_prompt_bytes: usize,
545) -> Result<String, MemoryError> {
546 let entities_str = entity_names.join(", ");
547 let facts_str = edge_facts
549 .iter()
550 .take(20)
551 .map(|f| format!("- {f}"))
552 .collect::<Vec<_>>()
553 .join("\n");
554
555 let raw_prompt = format!(
556 "Summarize the following group of related entities and their relationships \
557 into a single paragraph (2-3 sentences). Focus on the theme that connects \
558 them and the key relationships.\n\nEntities: {entities_str}\n\
559 Relationships:\n{facts_str}\n\nSummary:"
560 );
561
562 let original_bytes = raw_prompt.len();
563 let truncated = raw_prompt.len() > max_prompt_bytes;
564 let prompt = truncate_prompt(raw_prompt, max_prompt_bytes);
565 if prompt.is_empty() {
566 return Ok(String::new());
567 }
568 if truncated {
569 tracing::warn!(
570 entity_count = entity_names.len(),
571 original_bytes,
572 truncated_bytes = prompt.len(),
573 "community summary prompt truncated"
574 );
575 }
576
577 let messages = [Message::from_legacy(Role::User, prompt)];
578 let response: String = provider.chat(&messages).await.map_err(MemoryError::Llm)?;
579 Ok(response)
580}
581
582#[cfg(test)]
583mod tests {
584 use std::sync::{Arc, Mutex};
585
586 use super::*;
587 use crate::graph::types::EntityType;
588 use crate::sqlite::SqliteStore;
589
590 async fn setup() -> GraphStore {
591 let store = SqliteStore::new(":memory:").await.unwrap();
592 GraphStore::new(store.pool().clone())
593 }
594
595 fn mock_provider() -> AnyProvider {
596 AnyProvider::Mock(zeph_llm::mock::MockProvider::default())
597 }
598
599 fn recording_provider() -> (
600 AnyProvider,
601 Arc<Mutex<Vec<Vec<zeph_llm::provider::Message>>>>,
602 ) {
603 let (mock, buf) = zeph_llm::mock::MockProvider::default().with_recording();
604 (AnyProvider::Mock(mock), buf)
605 }
606
607 #[tokio::test]
608 async fn test_detect_communities_empty_graph() {
609 let store = setup().await;
610 let provider = mock_provider();
611 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
612 .await
613 .unwrap();
614 assert_eq!(count, 0);
615 }
616
617 #[tokio::test]
618 async fn test_detect_communities_single_entity() {
619 let store = setup().await;
620 let provider = mock_provider();
621 store
622 .upsert_entity("Solo", "Solo", EntityType::Concept, None)
623 .await
624 .unwrap();
625 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
626 .await
627 .unwrap();
628 assert_eq!(count, 0, "single isolated entity must not form a community");
629 }
630
631 #[tokio::test]
632 async fn test_single_entity_community_filtered() {
633 let store = setup().await;
634 let provider = mock_provider();
635
636 let a = store
638 .upsert_entity("A", "A", EntityType::Concept, None)
639 .await
640 .unwrap();
641 let b = store
642 .upsert_entity("B", "B", EntityType::Concept, None)
643 .await
644 .unwrap();
645 let c = store
646 .upsert_entity("C", "C", EntityType::Concept, None)
647 .await
648 .unwrap();
649 let iso = store
650 .upsert_entity("Isolated", "Isolated", EntityType::Concept, None)
651 .await
652 .unwrap();
653
654 store
655 .insert_edge(a, b, "r", "A relates B", 1.0, None)
656 .await
657 .unwrap();
658 store
659 .insert_edge(b, c, "r", "B relates C", 1.0, None)
660 .await
661 .unwrap();
662
663 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
664 .await
665 .unwrap();
666 assert_eq!(count, 1, "only the 3-entity cluster should be detected");
668
669 let communities = store.all_communities().await.unwrap();
670 assert_eq!(communities.len(), 1);
671 assert!(
672 !communities[0].entity_ids.contains(&iso),
673 "isolated entity must not be in any community"
674 );
675 }
676
677 #[tokio::test]
678 async fn test_label_propagation_basic() {
679 let store = setup().await;
680 let provider = mock_provider();
681
682 let mut cluster_ids: Vec<Vec<i64>> = Vec::new();
684 for cluster in 0..4_i64 {
685 let mut ids = Vec::new();
686 for node in 0..3_i64 {
687 let name = format!("c{cluster}_n{node}");
688 let id = store
689 .upsert_entity(&name, &name, EntityType::Concept, None)
690 .await
691 .unwrap();
692 ids.push(id);
693 }
694 store
696 .insert_edge(ids[0], ids[1], "r", "f", 1.0, None)
697 .await
698 .unwrap();
699 store
700 .insert_edge(ids[1], ids[2], "r", "f", 1.0, None)
701 .await
702 .unwrap();
703 cluster_ids.push(ids);
704 }
705
706 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
707 .await
708 .unwrap();
709 assert_eq!(count, 4, "expected 4 communities, one per cluster");
710
711 let communities = store.all_communities().await.unwrap();
712 assert_eq!(communities.len(), 4);
713
714 for ids in &cluster_ids {
716 let found = communities
717 .iter()
718 .filter(|c| ids.iter().any(|id| c.entity_ids.contains(id)))
719 .count();
720 assert_eq!(
721 found, 1,
722 "all nodes of a cluster must be in the same community"
723 );
724 }
725 }
726
727 #[tokio::test]
728 async fn test_all_isolated_nodes() {
729 let store = setup().await;
730 let provider = mock_provider();
731
732 for i in 0..5_i64 {
734 store
735 .upsert_entity(
736 &format!("iso_{i}"),
737 &format!("iso_{i}"),
738 EntityType::Concept,
739 None,
740 )
741 .await
742 .unwrap();
743 }
744
745 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
746 .await
747 .unwrap();
748 assert_eq!(count, 0, "zero-edge graph must produce no communities");
749 assert_eq!(store.community_count().await.unwrap(), 0);
750 }
751
752 #[tokio::test]
753 async fn test_eviction_expired_edges() {
754 let store = setup().await;
755
756 let a = store
757 .upsert_entity("EA", "EA", EntityType::Concept, None)
758 .await
759 .unwrap();
760 let b = store
761 .upsert_entity("EB", "EB", EntityType::Concept, None)
762 .await
763 .unwrap();
764 let edge_id = store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
765 store.invalidate_edge(edge_id).await.unwrap();
766
767 sqlx::query(
769 "UPDATE graph_edges SET expired_at = datetime('now', '-200 days') WHERE id = ?1",
770 )
771 .bind(edge_id)
772 .execute(store.pool())
773 .await
774 .unwrap();
775
776 let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
777 assert_eq!(stats.expired_edges_deleted, 1);
778 }
779
780 #[tokio::test]
781 async fn test_eviction_orphan_entities() {
782 let store = setup().await;
783
784 let iso = store
785 .upsert_entity("Orphan", "Orphan", EntityType::Concept, None)
786 .await
787 .unwrap();
788
789 sqlx::query(
791 "UPDATE graph_entities SET last_seen_at = datetime('now', '-200 days') WHERE id = ?1",
792 )
793 .bind(iso)
794 .execute(store.pool())
795 .await
796 .unwrap();
797
798 let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
799 assert_eq!(stats.orphan_entities_deleted, 1);
800 }
801
802 #[tokio::test]
803 async fn test_eviction_entity_cap() {
804 let store = setup().await;
805
806 for i in 0..5_i64 {
808 let name = format!("cap_entity_{i}");
809 store
810 .upsert_entity(&name, &name, EntityType::Concept, None)
811 .await
812 .unwrap();
813 }
814
815 let stats = run_graph_eviction(&store, 90, 3).await.unwrap();
816 assert_eq!(
817 stats.capped_entities_deleted, 2,
818 "should delete 5-3=2 entities"
819 );
820 assert_eq!(store.entity_count().await.unwrap(), 3);
821 }
822
823 #[tokio::test]
824 async fn test_assign_to_community_no_neighbors() {
825 let store = setup().await;
826 let entity_id = store
827 .upsert_entity("Loner", "Loner", EntityType::Concept, None)
828 .await
829 .unwrap();
830
831 let result = assign_to_community(&store, entity_id).await.unwrap();
832 assert!(result.is_none());
833 }
834
835 #[tokio::test]
836 async fn test_extraction_count_persistence() {
837 use tempfile::NamedTempFile;
838 let tmp = NamedTempFile::new().unwrap();
840 let path = tmp.path().to_str().unwrap().to_owned();
841
842 let store1 = {
843 let s = crate::sqlite::SqliteStore::new(&path).await.unwrap();
844 GraphStore::new(s.pool().clone())
845 };
846
847 store1.set_metadata("extraction_count", "0").await.unwrap();
848 for i in 1..=5_i64 {
849 store1
850 .set_metadata("extraction_count", &i.to_string())
851 .await
852 .unwrap();
853 }
854
855 let store2 = {
857 let s = crate::sqlite::SqliteStore::new(&path).await.unwrap();
858 GraphStore::new(s.pool().clone())
859 };
860 assert_eq!(store2.extraction_count().await.unwrap(), 5);
861 }
862
863 #[test]
864 fn test_scrub_content_ascii_control() {
865 let input = "hello\nworld\r\x00\x01\x09end";
867 assert_eq!(scrub_content(input), "helloworldend");
868 }
869
870 #[test]
871 fn test_scrub_content_bidi_overrides() {
872 let input = "safe\u{202A}inject\u{202E}end\u{2066}iso\u{2069}done".to_string();
875 assert_eq!(scrub_content(&input), "safeinjectendisodone");
876 }
877
878 #[test]
879 fn test_scrub_content_zero_width() {
880 let input = "a\u{200B}b\u{200C}c\u{200D}d\u{200F}e".to_string();
883 assert_eq!(scrub_content(&input), "abcde");
884 }
885
886 #[test]
887 fn test_scrub_content_bom() {
888 let input = "\u{FEFF}hello".to_string();
890 assert_eq!(scrub_content(&input), "hello");
891 }
892
893 #[test]
894 fn test_scrub_content_clean_string_unchanged() {
895 let input = "Hello, World! 123 — normal text.";
896 assert_eq!(scrub_content(input), input);
897 }
898
899 #[test]
900 fn test_truncate_prompt_within_limit() {
901 let result = truncate_prompt("short".into(), 100);
902 assert_eq!(result, "short");
903 }
904
905 #[test]
906 fn test_truncate_prompt_zero_max_bytes() {
907 let result = truncate_prompt("hello".into(), 0);
908 assert_eq!(result, "");
909 }
910
911 #[test]
912 fn test_truncate_prompt_long_facts() {
913 let facts: Vec<String> = (0..20)
914 .map(|i| format!("fact_{i}_{}", "x".repeat(20)))
915 .collect();
916 let prompt = facts.join("\n");
917 let result = truncate_prompt(prompt, 200);
918 assert!(
919 result.ends_with("..."),
920 "truncated prompt must end with '...'"
921 );
922 assert!(result.len() <= 203);
924 assert!(std::str::from_utf8(result.as_bytes()).is_ok());
925 }
926
927 #[test]
928 fn test_truncate_prompt_utf8_boundary() {
929 let prompt = "🔥".repeat(100);
931 let result = truncate_prompt(prompt, 10);
932 assert!(
933 result.ends_with("..."),
934 "truncated prompt must end with '...'"
935 );
936 assert_eq!(result.len(), 8 + 3, "2 emojis (8 bytes) + '...' (3 bytes)");
938 assert!(std::str::from_utf8(result.as_bytes()).is_ok());
939 }
940
941 #[tokio::test]
942 async fn test_assign_to_community_majority_vote() {
943 let store = setup().await;
944
945 let a = store
947 .upsert_entity("AA", "AA", EntityType::Concept, None)
948 .await
949 .unwrap();
950 let b = store
951 .upsert_entity("BB", "BB", EntityType::Concept, None)
952 .await
953 .unwrap();
954 let d = store
955 .upsert_entity("DD", "DD", EntityType::Concept, None)
956 .await
957 .unwrap();
958
959 store
960 .upsert_community("test_community", "summary", &[a, b], None)
961 .await
962 .unwrap();
963
964 store.insert_edge(d, a, "r", "f", 1.0, None).await.unwrap();
965 store.insert_edge(d, b, "r", "f", 1.0, None).await.unwrap();
966
967 let result = assign_to_community(&store, d).await.unwrap();
968 assert!(result.is_some());
969
970 let returned_id = result.unwrap();
972 let community = store
973 .find_community_by_id(returned_id)
974 .await
975 .unwrap()
976 .expect("returned community_id must reference an existing row");
977 assert!(
978 community.entity_ids.contains(&d),
979 "D should be added to the community"
980 );
981 assert!(
983 community.fingerprint.is_none(),
984 "fingerprint must be cleared after assign_to_community"
985 );
986 }
987
988 #[tokio::test]
990 async fn test_incremental_detection_no_changes_skips_llm() {
991 let store = setup().await;
992 let (provider, call_buf) = recording_provider();
993
994 let a = store
995 .upsert_entity("X", "X", EntityType::Concept, None)
996 .await
997 .unwrap();
998 let b = store
999 .upsert_entity("Y", "Y", EntityType::Concept, None)
1000 .await
1001 .unwrap();
1002 store
1003 .insert_edge(a, b, "r", "X relates Y", 1.0, None)
1004 .await
1005 .unwrap();
1006
1007 detect_communities(&store, &provider, usize::MAX, 4, 0)
1009 .await
1010 .unwrap();
1011 let first_calls = call_buf.lock().unwrap().len();
1012 assert_eq!(first_calls, 1, "first run must produce exactly 1 LLM call");
1013
1014 detect_communities(&store, &provider, usize::MAX, 4, 0)
1016 .await
1017 .unwrap();
1018 let second_calls = call_buf.lock().unwrap().len();
1019 assert_eq!(
1020 second_calls, first_calls,
1021 "second run with no graph changes must produce 0 additional LLM calls"
1022 );
1023 }
1024
1025 #[tokio::test]
1027 async fn test_incremental_detection_edge_change_triggers_resummary() {
1028 let store = setup().await;
1029 let (provider, call_buf) = recording_provider();
1030
1031 let a = store
1032 .upsert_entity("P", "P", EntityType::Concept, None)
1033 .await
1034 .unwrap();
1035 let b = store
1036 .upsert_entity("Q", "Q", EntityType::Concept, None)
1037 .await
1038 .unwrap();
1039 store
1040 .insert_edge(a, b, "r", "P relates Q", 1.0, None)
1041 .await
1042 .unwrap();
1043
1044 detect_communities(&store, &provider, usize::MAX, 4, 0)
1045 .await
1046 .unwrap();
1047 let after_first = call_buf.lock().unwrap().len();
1048 assert_eq!(after_first, 1);
1049
1050 store
1052 .insert_edge(b, a, "r2", "Q also relates P", 1.0, None)
1053 .await
1054 .unwrap();
1055
1056 detect_communities(&store, &provider, usize::MAX, 4, 0)
1057 .await
1058 .unwrap();
1059 let after_second = call_buf.lock().unwrap().len();
1060 assert_eq!(
1061 after_second, 2,
1062 "edge change must trigger one additional LLM call"
1063 );
1064 }
1065
1066 #[tokio::test]
1068 async fn test_incremental_detection_dissolved_community_deleted() {
1069 let store = setup().await;
1070 let provider = mock_provider();
1071
1072 let a = store
1073 .upsert_entity("M1", "M1", EntityType::Concept, None)
1074 .await
1075 .unwrap();
1076 let b = store
1077 .upsert_entity("M2", "M2", EntityType::Concept, None)
1078 .await
1079 .unwrap();
1080 let edge_id = store
1081 .insert_edge(a, b, "r", "M1 relates M2", 1.0, None)
1082 .await
1083 .unwrap();
1084
1085 detect_communities(&store, &provider, usize::MAX, 4, 0)
1086 .await
1087 .unwrap();
1088 assert_eq!(store.community_count().await.unwrap(), 1);
1089
1090 store.invalidate_edge(edge_id).await.unwrap();
1092
1093 detect_communities(&store, &provider, usize::MAX, 4, 0)
1094 .await
1095 .unwrap();
1096 assert_eq!(
1097 store.community_count().await.unwrap(),
1098 0,
1099 "dissolved community must be deleted on next refresh"
1100 );
1101 }
1102
1103 #[tokio::test]
1105 async fn test_detect_communities_concurrency_one() {
1106 let store = setup().await;
1107 let provider = mock_provider();
1108
1109 let a = store
1110 .upsert_entity("C1A", "C1A", EntityType::Concept, None)
1111 .await
1112 .unwrap();
1113 let b = store
1114 .upsert_entity("C1B", "C1B", EntityType::Concept, None)
1115 .await
1116 .unwrap();
1117 store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
1118
1119 let count = detect_communities(&store, &provider, usize::MAX, 1, 0)
1120 .await
1121 .unwrap();
1122 assert_eq!(count, 1, "concurrency=1 must still detect the community");
1123 assert_eq!(store.community_count().await.unwrap(), 1);
1124 }
1125
1126 #[test]
1127 fn test_compute_fingerprint_deterministic() {
1128 let fp1 = compute_partition_fingerprint(&[1, 2, 3], &[10, 20]);
1129 let fp2 = compute_partition_fingerprint(&[3, 1, 2], &[20, 10]);
1130 assert_eq!(fp1, fp2, "fingerprint must be order-independent");
1131
1132 let fp3 = compute_partition_fingerprint(&[1, 2, 3], &[10, 30]);
1133 assert_ne!(
1134 fp1, fp3,
1135 "different edge IDs must produce different fingerprint"
1136 );
1137
1138 let fp4 = compute_partition_fingerprint(&[1, 2, 4], &[10, 20]);
1139 assert_ne!(
1140 fp1, fp4,
1141 "different entity IDs must produce different fingerprint"
1142 );
1143 }
1144
1145 #[test]
1150 fn test_compute_fingerprint_domain_separation() {
1151 let fp_a = compute_partition_fingerprint(&[1, 2], &[3]);
1152 let fp_b = compute_partition_fingerprint(&[1], &[2, 3]);
1153 assert_ne!(
1154 fp_a, fp_b,
1155 "entity/edge sequences with same raw bytes must produce different fingerprints"
1156 );
1157 }
1158
1159 #[tokio::test]
1165 async fn test_detect_communities_chunked_correct_membership() {
1166 let store = setup().await;
1167 let provider = mock_provider();
1168
1169 let a = store
1171 .upsert_entity("CA", "CA", EntityType::Concept, None)
1172 .await
1173 .unwrap();
1174 let b = store
1175 .upsert_entity("CB", "CB", EntityType::Concept, None)
1176 .await
1177 .unwrap();
1178 let c = store
1179 .upsert_entity("CC", "CC", EntityType::Concept, None)
1180 .await
1181 .unwrap();
1182 let d = store
1183 .upsert_entity("CD", "CD", EntityType::Concept, None)
1184 .await
1185 .unwrap();
1186 let e = store
1187 .upsert_entity("CE", "CE", EntityType::Concept, None)
1188 .await
1189 .unwrap();
1190
1191 store
1192 .insert_edge(a, b, "r", "A-B fact", 1.0, None)
1193 .await
1194 .unwrap();
1195 store
1196 .insert_edge(b, c, "r", "B-C fact", 1.0, None)
1197 .await
1198 .unwrap();
1199 store
1200 .insert_edge(d, e, "r", "D-E fact", 1.0, None)
1201 .await
1202 .unwrap();
1203
1204 let count_chunked = detect_communities(&store, &provider, usize::MAX, 4, 1)
1206 .await
1207 .unwrap();
1208 assert_eq!(
1209 count_chunked, 2,
1210 "chunked loading must detect both communities"
1211 );
1212
1213 let communities = store.all_communities().await.unwrap();
1215 assert_eq!(communities.len(), 2);
1216
1217 let abc_ids = [a, b, c];
1218 let de_ids = [d, e];
1219 let has_abc = communities
1220 .iter()
1221 .any(|c| abc_ids.iter().all(|id| c.entity_ids.contains(id)));
1222 let has_de = communities
1223 .iter()
1224 .any(|c| de_ids.iter().all(|id| c.entity_ids.contains(id)));
1225 assert!(has_abc, "cluster A-B-C must form a community");
1226 assert!(has_de, "cluster D-E must form a community");
1227 }
1228
1229 #[tokio::test]
1231 async fn test_detect_communities_chunk_size_max() {
1232 let store = setup().await;
1233 let provider = mock_provider();
1234
1235 let x = store
1236 .upsert_entity("MX", "MX", EntityType::Concept, None)
1237 .await
1238 .unwrap();
1239 let y = store
1240 .upsert_entity("MY", "MY", EntityType::Concept, None)
1241 .await
1242 .unwrap();
1243 store
1244 .insert_edge(x, y, "r", "X-Y fact", 1.0, None)
1245 .await
1246 .unwrap();
1247
1248 let count = detect_communities(&store, &provider, usize::MAX, 4, usize::MAX)
1249 .await
1250 .unwrap();
1251 assert_eq!(count, 1, "chunk_size=usize::MAX must detect the community");
1252 }
1253
1254 #[tokio::test]
1256 async fn test_detect_communities_chunk_size_zero_fallback() {
1257 let store = setup().await;
1258 let provider = mock_provider();
1259
1260 let p = store
1261 .upsert_entity("ZP", "ZP", EntityType::Concept, None)
1262 .await
1263 .unwrap();
1264 let q = store
1265 .upsert_entity("ZQ", "ZQ", EntityType::Concept, None)
1266 .await
1267 .unwrap();
1268 store
1269 .insert_edge(p, q, "r", "P-Q fact", 1.0, None)
1270 .await
1271 .unwrap();
1272
1273 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
1274 .await
1275 .unwrap();
1276 assert_eq!(
1277 count, 1,
1278 "chunk_size=0 must detect the community via stream fallback"
1279 );
1280 }
1281
1282 #[tokio::test]
1286 async fn test_detect_communities_chunked_edge_map_complete() {
1287 let store = setup().await;
1288 let (provider, call_buf) = recording_provider();
1289
1290 let a = store
1291 .upsert_entity("FA", "FA", EntityType::Concept, None)
1292 .await
1293 .unwrap();
1294 let b = store
1295 .upsert_entity("FB", "FB", EntityType::Concept, None)
1296 .await
1297 .unwrap();
1298 store
1299 .insert_edge(a, b, "r", "edge1 fact", 1.0, None)
1300 .await
1301 .unwrap();
1302
1303 detect_communities(&store, &provider, usize::MAX, 4, 1)
1305 .await
1306 .unwrap();
1307 let calls_after_first = call_buf.lock().unwrap().len();
1308 assert_eq!(calls_after_first, 1, "first run must trigger 1 LLM call");
1309
1310 store
1312 .insert_edge(b, a, "r2", "edge2 fact", 1.0, None)
1313 .await
1314 .unwrap();
1315
1316 detect_communities(&store, &provider, usize::MAX, 4, 1)
1317 .await
1318 .unwrap();
1319 let calls_after_second = call_buf.lock().unwrap().len();
1320 assert_eq!(
1321 calls_after_second, 2,
1322 "adding an edge must change fingerprint and trigger re-summarization"
1323 );
1324 }
1325}