1use std::collections::HashMap;
5use std::sync::Arc;
6#[allow(unused_imports)]
7use zeph_db::sql;
8
9use futures::TryStreamExt as _;
10use petgraph::Graph;
11use petgraph::graph::NodeIndex;
12use tokio::sync::Semaphore;
13use tokio::task::JoinSet;
14use zeph_llm::LlmProvider as _;
15use zeph_llm::any::AnyProvider;
16use zeph_llm::provider::{Message, Role};
17
18use crate::error::MemoryError;
19
20use super::store::GraphStore;
21use super::types::Entity;
22
23const MAX_LABEL_PROPAGATION_ITERATIONS: usize = 50;
24
25fn scrub_content(s: &str) -> String {
34 s.chars()
35 .filter(|c| {
36 !c.is_control()
37 && !matches!(*c as u32,
38 0x200B..=0x200F | 0x202A..=0x202E | 0x2066..=0x2069 | 0xFEFF
39 )
40 })
41 .collect()
42}
43
44#[derive(Debug, Default)]
46pub struct GraphEvictionStats {
47 pub expired_edges_deleted: usize,
48 pub orphan_entities_deleted: usize,
49 pub capped_entities_deleted: usize,
50}
51
52fn truncate_prompt(prompt: String, max_bytes: usize) -> String {
58 if max_bytes == 0 {
59 return String::new();
60 }
61 if prompt.len() <= max_bytes {
62 return prompt;
63 }
64 let boundary = prompt.floor_char_boundary(max_bytes);
65 format!("{}...", &prompt[..boundary])
66}
67
68fn compute_partition_fingerprint(entity_ids: &[i64], intra_edge_ids: &[i64]) -> String {
74 let mut hasher = blake3::Hasher::new();
75 let mut sorted_entities = entity_ids.to_vec();
76 sorted_entities.sort_unstable();
77 hasher.update(b"entities");
78 for id in &sorted_entities {
79 hasher.update(&id.to_le_bytes());
80 }
81 let mut sorted_edges = intra_edge_ids.to_vec();
82 sorted_edges.sort_unstable();
83 hasher.update(b"edges");
84 for id in &sorted_edges {
85 hasher.update(&id.to_le_bytes());
86 }
87 hasher.finalize().to_hex().to_string()
88}
89
90struct CommunityData {
92 entity_ids: Vec<i64>,
93 entity_names: Vec<String>,
94 intra_facts: Vec<String>,
95 fingerprint: String,
96 name: String,
97}
98
99type UndirectedGraph = Graph<i64, (), petgraph::Undirected>;
100
101async fn build_entity_graph_and_maps(
102 store: &GraphStore,
103 entities: &[Entity],
104 edge_chunk_size: usize,
105) -> Result<
106 (
107 UndirectedGraph,
108 HashMap<(i64, i64), Vec<String>>,
109 HashMap<(i64, i64), Vec<i64>>,
110 ),
111 MemoryError,
112> {
113 let mut graph = UndirectedGraph::new_undirected();
114 let mut node_map: HashMap<i64, NodeIndex> = HashMap::new();
115
116 for entity in entities {
117 let idx = graph.add_node(entity.id);
118 node_map.insert(entity.id, idx);
119 }
120
121 let mut edge_facts_map: HashMap<(i64, i64), Vec<String>> = HashMap::new();
122 let mut edge_id_map: HashMap<(i64, i64), Vec<i64>> = HashMap::new();
123
124 if edge_chunk_size == 0 {
125 let edges: Vec<_> = store.all_active_edges_stream().try_collect().await?;
126 for edge in &edges {
127 if let (Some(&src_idx), Some(&tgt_idx)) = (
128 node_map.get(&edge.source_entity_id),
129 node_map.get(&edge.target_entity_id),
130 ) {
131 graph.add_edge(src_idx, tgt_idx, ());
132 }
133 let key = (edge.source_entity_id, edge.target_entity_id);
134 edge_facts_map
135 .entry(key)
136 .or_default()
137 .push(edge.fact.clone());
138 edge_id_map.entry(key).or_default().push(edge.id);
139 }
140 } else {
141 let limit = i64::try_from(edge_chunk_size).unwrap_or(i64::MAX);
142 let mut last_id: i64 = 0;
143 loop {
144 let chunk = store.edges_after_id(last_id, limit).await?;
145 if chunk.is_empty() {
146 break;
147 }
148 last_id = chunk.last().expect("non-empty chunk has a last element").id;
149 for edge in &chunk {
150 if let (Some(&src_idx), Some(&tgt_idx)) = (
151 node_map.get(&edge.source_entity_id),
152 node_map.get(&edge.target_entity_id),
153 ) {
154 graph.add_edge(src_idx, tgt_idx, ());
155 }
156 let key = (edge.source_entity_id, edge.target_entity_id);
157 edge_facts_map
158 .entry(key)
159 .or_default()
160 .push(edge.fact.clone());
161 edge_id_map.entry(key).or_default().push(edge.id);
162 }
163 }
164 }
165
166 Ok((graph, edge_facts_map, edge_id_map))
167}
168
169fn run_label_propagation(graph: &UndirectedGraph) -> HashMap<usize, Vec<i64>> {
170 let mut labels: Vec<usize> = (0..graph.node_count()).collect();
171
172 for _ in 0..MAX_LABEL_PROPAGATION_ITERATIONS {
173 let mut changed = false;
174 for node_idx in graph.node_indices() {
175 let neighbors: Vec<NodeIndex> = graph.neighbors(node_idx).collect();
176 if neighbors.is_empty() {
177 continue;
178 }
179 let mut freq: HashMap<usize, usize> = HashMap::new();
180 for &nbr in &neighbors {
181 *freq.entry(labels[nbr.index()]).or_insert(0) += 1;
182 }
183 let max_count = *freq.values().max().unwrap_or(&0);
184 let best_label = freq
185 .iter()
186 .filter(|&(_, count)| *count == max_count)
187 .map(|(&label, _)| label)
188 .min()
189 .unwrap_or(labels[node_idx.index()]);
190 if labels[node_idx.index()] != best_label {
191 labels[node_idx.index()] = best_label;
192 changed = true;
193 }
194 }
195 if !changed {
196 break;
197 }
198 }
199
200 let mut communities: HashMap<usize, Vec<i64>> = HashMap::new();
201 for node_idx in graph.node_indices() {
202 let entity_id = graph[node_idx];
203 communities
204 .entry(labels[node_idx.index()])
205 .or_default()
206 .push(entity_id);
207 }
208 communities.retain(|_, members| members.len() >= 2);
209 communities
210}
211
212struct ClassifyResult {
213 to_summarize: Vec<CommunityData>,
214 unchanged_count: usize,
215 new_fingerprints: std::collections::HashSet<String>,
216}
217
218fn classify_communities(
219 communities: &HashMap<usize, Vec<i64>>,
220 edge_facts_map: &HashMap<(i64, i64), Vec<String>>,
221 edge_id_map: &HashMap<(i64, i64), Vec<i64>>,
222 entity_name_map: &HashMap<i64, &str>,
223 stored_fingerprints: &HashMap<String, i64>,
224 sorted_labels: &[usize],
225) -> ClassifyResult {
226 let mut to_summarize: Vec<CommunityData> = Vec::new();
227 let mut unchanged_count = 0usize;
228 let mut new_fingerprints: std::collections::HashSet<String> = std::collections::HashSet::new();
229
230 for (label_index, &label) in sorted_labels.iter().enumerate() {
231 let entity_ids = communities[&label].as_slice();
232 let member_set: std::collections::HashSet<i64> = entity_ids.iter().copied().collect();
233
234 let mut intra_facts: Vec<String> = Vec::new();
235 let mut intra_edge_ids: Vec<i64> = Vec::new();
236 for (&(src, tgt), facts) in edge_facts_map {
237 if member_set.contains(&src) && member_set.contains(&tgt) {
238 intra_facts.extend(facts.iter().map(|f| scrub_content(f)));
239 if let Some(ids) = edge_id_map.get(&(src, tgt)) {
240 intra_edge_ids.extend_from_slice(ids);
241 }
242 }
243 }
244
245 let fingerprint = compute_partition_fingerprint(entity_ids, &intra_edge_ids);
246 new_fingerprints.insert(fingerprint.clone());
247
248 if stored_fingerprints.contains_key(&fingerprint) {
249 unchanged_count += 1;
250 continue;
251 }
252
253 let entity_names: Vec<String> = entity_ids
254 .iter()
255 .filter_map(|id| entity_name_map.get(id).map(|&s| scrub_content(s)))
256 .collect();
257
258 let base_name = entity_names
261 .iter()
262 .take(3)
263 .cloned()
264 .collect::<Vec<_>>()
265 .join(", ");
266 let name = format!("{base_name} [{label_index}]");
267
268 to_summarize.push(CommunityData {
269 entity_ids: entity_ids.to_vec(),
270 entity_names,
271 intra_facts,
272 fingerprint,
273 name,
274 });
275 }
276
277 ClassifyResult {
278 to_summarize,
279 unchanged_count,
280 new_fingerprints,
281 }
282}
283
284async fn summarize_and_upsert_communities(
285 store: &GraphStore,
286 provider: &AnyProvider,
287 to_summarize: Vec<CommunityData>,
288 concurrency: usize,
289 community_summary_max_prompt_bytes: usize,
290) -> Result<usize, MemoryError> {
291 let semaphore = Arc::new(Semaphore::new(concurrency.max(1)));
292 let mut join_set: JoinSet<(String, String, Vec<i64>, String)> = JoinSet::new();
293
294 for data in to_summarize {
295 let provider = provider.clone();
296 let sem = Arc::clone(&semaphore);
297 let max_bytes = community_summary_max_prompt_bytes;
298 join_set.spawn(async move {
299 let _permit = sem.acquire().await.expect("semaphore is never closed");
300 let summary = match generate_community_summary(
301 &provider,
302 &data.entity_names,
303 &data.intra_facts,
304 max_bytes,
305 )
306 .await
307 {
308 Ok(text) => text,
309 Err(e) => {
310 tracing::warn!(community = %data.name, "community summary generation failed: {e:#}");
311 String::new()
312 }
313 };
314 (data.name, summary, data.entity_ids, data.fingerprint)
315 });
316 }
317
318 let mut results: Vec<(String, String, Vec<i64>, String)> = Vec::new();
320 while let Some(outcome) = join_set.join_next().await {
321 match outcome {
322 Ok(tuple) => results.push(tuple),
323 Err(e) => {
324 tracing::error!(
325 panicked = e.is_panic(),
326 cancelled = e.is_cancelled(),
327 "community summary task failed"
328 );
329 }
330 }
331 }
332
333 results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
334
335 let mut count = 0usize;
336 for (name, summary, entity_ids, fingerprint) in results {
337 store
338 .upsert_community(&name, &summary, &entity_ids, Some(&fingerprint))
339 .await?;
340 count += 1;
341 }
342
343 Ok(count)
344}
345
346pub async fn detect_communities(
365 store: &GraphStore,
366 provider: &AnyProvider,
367 community_summary_max_prompt_bytes: usize,
368 concurrency: usize,
369 edge_chunk_size: usize,
370) -> Result<usize, MemoryError> {
371 let entities = store.all_entities().await?;
372 if entities.len() < 2 {
373 return Ok(0);
374 }
375
376 let (graph, edge_facts_map, edge_id_map) =
377 build_entity_graph_and_maps(store, &entities, edge_chunk_size).await?;
378
379 let communities = run_label_propagation(&graph);
380
381 let entity_name_map: HashMap<i64, &str> =
382 entities.iter().map(|e| (e.id, e.name.as_str())).collect();
383 let stored_fingerprints = store.community_fingerprints().await?;
384
385 let mut sorted_labels: Vec<usize> = communities.keys().copied().collect();
386 sorted_labels.sort_unstable();
387
388 let ClassifyResult {
389 to_summarize,
390 unchanged_count,
391 new_fingerprints,
392 } = classify_communities(
393 &communities,
394 &edge_facts_map,
395 &edge_id_map,
396 &entity_name_map,
397 &stored_fingerprints,
398 &sorted_labels,
399 );
400
401 tracing::debug!(
402 total = sorted_labels.len(),
403 unchanged = unchanged_count,
404 to_summarize = to_summarize.len(),
405 "community detection: partition classification complete"
406 );
407
408 for (stored_fp, community_id) in &stored_fingerprints {
410 if !new_fingerprints.contains(stored_fp.as_str()) {
411 store.delete_community_by_id(*community_id).await?;
412 }
413 }
414
415 let new_count = summarize_and_upsert_communities(
416 store,
417 provider,
418 to_summarize,
419 concurrency,
420 community_summary_max_prompt_bytes,
421 )
422 .await?;
423
424 Ok(unchanged_count + new_count)
425}
426
427pub async fn assign_to_community(
438 store: &GraphStore,
439 entity_id: i64,
440) -> Result<Option<i64>, MemoryError> {
441 let edges = store.edges_for_entity(entity_id).await?;
442 if edges.is_empty() {
443 return Ok(None);
444 }
445
446 let neighbor_ids: Vec<i64> = edges
447 .iter()
448 .map(|e| {
449 if e.source_entity_id == entity_id {
450 e.target_entity_id
451 } else {
452 e.source_entity_id
453 }
454 })
455 .collect();
456
457 let mut community_votes: HashMap<i64, usize> = HashMap::new();
458 for &nbr_id in &neighbor_ids {
459 if let Some(community) = store.community_for_entity(nbr_id).await? {
460 *community_votes.entry(community.id).or_insert(0) += 1;
461 }
462 }
463
464 if community_votes.is_empty() {
465 return Ok(None);
466 }
467
468 let Some((&best_community_id, _)) =
471 community_votes
472 .iter()
473 .max_by(|&(&id_a, &count_a), &(&id_b, &count_b)| {
474 count_a.cmp(&count_b).then(id_b.cmp(&id_a))
475 })
476 else {
477 return Ok(None);
478 };
479
480 if let Some(mut target) = store.find_community_by_id(best_community_id).await? {
481 if !target.entity_ids.contains(&entity_id) {
482 target.entity_ids.push(entity_id);
483 store
484 .upsert_community(&target.name, &target.summary, &target.entity_ids, None)
485 .await?;
486 store.clear_community_fingerprint(best_community_id).await?;
488 }
489 return Ok(Some(best_community_id));
490 }
491
492 Ok(None)
493}
494
495pub async fn cleanup_stale_entity_embeddings(
503 _store: &GraphStore,
504 _embeddings: &crate::embedding_store::EmbeddingStore,
505) -> Result<usize, MemoryError> {
506 Ok(0)
511}
512
513pub async fn run_graph_eviction(
519 store: &GraphStore,
520 expired_edge_retention_days: u32,
521 max_entities: usize,
522) -> Result<GraphEvictionStats, MemoryError> {
523 let expired_edges_deleted = store
524 .delete_expired_edges(expired_edge_retention_days)
525 .await?;
526 let orphan_entities_deleted = store
527 .delete_orphan_entities(expired_edge_retention_days)
528 .await?;
529 let capped_entities_deleted = if max_entities > 0 {
530 store.cap_entities(max_entities).await?
531 } else {
532 0
533 };
534
535 Ok(GraphEvictionStats {
536 expired_edges_deleted,
537 orphan_entities_deleted,
538 capped_entities_deleted,
539 })
540}
541
542async fn generate_community_summary(
543 provider: &AnyProvider,
544 entity_names: &[String],
545 edge_facts: &[String],
546 max_prompt_bytes: usize,
547) -> Result<String, MemoryError> {
548 let entities_str = entity_names.join(", ");
549 let facts_str = edge_facts
551 .iter()
552 .take(20)
553 .map(|f| format!("- {f}"))
554 .collect::<Vec<_>>()
555 .join("\n");
556
557 let raw_prompt = format!(
558 "Summarize the following group of related entities and their relationships \
559 into a single paragraph (2-3 sentences). Focus on the theme that connects \
560 them and the key relationships.\n\nEntities: {entities_str}\n\
561 Relationships:\n{facts_str}\n\nSummary:"
562 );
563
564 let original_bytes = raw_prompt.len();
565 let truncated = raw_prompt.len() > max_prompt_bytes;
566 let prompt = truncate_prompt(raw_prompt, max_prompt_bytes);
567 if prompt.is_empty() {
568 return Ok(String::new());
569 }
570 if truncated {
571 tracing::warn!(
572 entity_count = entity_names.len(),
573 original_bytes,
574 truncated_bytes = prompt.len(),
575 "community summary prompt truncated"
576 );
577 }
578
579 let messages = [Message::from_legacy(Role::User, prompt)];
580 let response: String = provider.chat(&messages).await.map_err(MemoryError::Llm)?;
581 Ok(response)
582}
583
584#[cfg(test)]
585mod tests {
586 use std::sync::{Arc, Mutex};
587
588 use super::*;
589 use crate::graph::types::EntityType;
590 use crate::store::SqliteStore;
591
592 async fn setup() -> GraphStore {
593 let store = SqliteStore::new(":memory:").await.unwrap();
594 GraphStore::new(store.pool().clone())
595 }
596
597 fn mock_provider() -> AnyProvider {
598 AnyProvider::Mock(zeph_llm::mock::MockProvider::default())
599 }
600
601 fn recording_provider() -> (
602 AnyProvider,
603 Arc<Mutex<Vec<Vec<zeph_llm::provider::Message>>>>,
604 ) {
605 let (mock, buf) = zeph_llm::mock::MockProvider::default().with_recording();
606 (AnyProvider::Mock(mock), buf)
607 }
608
609 #[tokio::test]
610 async fn test_detect_communities_empty_graph() {
611 let store = setup().await;
612 let provider = mock_provider();
613 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
614 .await
615 .unwrap();
616 assert_eq!(count, 0);
617 }
618
619 #[tokio::test]
620 async fn test_detect_communities_single_entity() {
621 let store = setup().await;
622 let provider = mock_provider();
623 store
624 .upsert_entity("Solo", "Solo", EntityType::Concept, None)
625 .await
626 .unwrap();
627 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
628 .await
629 .unwrap();
630 assert_eq!(count, 0, "single isolated entity must not form a community");
631 }
632
633 #[tokio::test]
634 async fn test_single_entity_community_filtered() {
635 let store = setup().await;
636 let provider = mock_provider();
637
638 let a = store
640 .upsert_entity("A", "A", EntityType::Concept, None)
641 .await
642 .unwrap();
643 let b = store
644 .upsert_entity("B", "B", EntityType::Concept, None)
645 .await
646 .unwrap();
647 let c = store
648 .upsert_entity("C", "C", EntityType::Concept, None)
649 .await
650 .unwrap();
651 let iso = store
652 .upsert_entity("Isolated", "Isolated", EntityType::Concept, None)
653 .await
654 .unwrap();
655
656 store
657 .insert_edge(a, b, "r", "A relates B", 1.0, None)
658 .await
659 .unwrap();
660 store
661 .insert_edge(b, c, "r", "B relates C", 1.0, None)
662 .await
663 .unwrap();
664
665 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
666 .await
667 .unwrap();
668 assert_eq!(count, 1, "only the 3-entity cluster should be detected");
670
671 let communities = store.all_communities().await.unwrap();
672 assert_eq!(communities.len(), 1);
673 assert!(
674 !communities[0].entity_ids.contains(&iso),
675 "isolated entity must not be in any community"
676 );
677 }
678
679 #[tokio::test]
680 async fn test_label_propagation_basic() {
681 let store = setup().await;
682 let provider = mock_provider();
683
684 let mut cluster_ids: Vec<Vec<i64>> = Vec::new();
686 for cluster in 0..4_i64 {
687 let mut ids = Vec::new();
688 for node in 0..3_i64 {
689 let name = format!("c{cluster}_n{node}");
690 let id = store
691 .upsert_entity(&name, &name, EntityType::Concept, None)
692 .await
693 .unwrap();
694 ids.push(id);
695 }
696 store
698 .insert_edge(ids[0], ids[1], "r", "f", 1.0, None)
699 .await
700 .unwrap();
701 store
702 .insert_edge(ids[1], ids[2], "r", "f", 1.0, None)
703 .await
704 .unwrap();
705 cluster_ids.push(ids);
706 }
707
708 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
709 .await
710 .unwrap();
711 assert_eq!(count, 4, "expected 4 communities, one per cluster");
712
713 let communities = store.all_communities().await.unwrap();
714 assert_eq!(communities.len(), 4);
715
716 for ids in &cluster_ids {
718 let found = communities
719 .iter()
720 .filter(|c| ids.iter().any(|id| c.entity_ids.contains(id)))
721 .count();
722 assert_eq!(
723 found, 1,
724 "all nodes of a cluster must be in the same community"
725 );
726 }
727 }
728
729 #[tokio::test]
730 async fn test_all_isolated_nodes() {
731 let store = setup().await;
732 let provider = mock_provider();
733
734 for i in 0..5_i64 {
736 store
737 .upsert_entity(
738 &format!("iso_{i}"),
739 &format!("iso_{i}"),
740 EntityType::Concept,
741 None,
742 )
743 .await
744 .unwrap();
745 }
746
747 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
748 .await
749 .unwrap();
750 assert_eq!(count, 0, "zero-edge graph must produce no communities");
751 assert_eq!(store.community_count().await.unwrap(), 0);
752 }
753
754 #[tokio::test]
755 async fn test_eviction_expired_edges() {
756 let store = setup().await;
757
758 let a = store
759 .upsert_entity("EA", "EA", EntityType::Concept, None)
760 .await
761 .unwrap();
762 let b = store
763 .upsert_entity("EB", "EB", EntityType::Concept, None)
764 .await
765 .unwrap();
766 let edge_id = store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
767 store.invalidate_edge(edge_id).await.unwrap();
768
769 zeph_db::query(sql!(
771 "UPDATE graph_edges SET expired_at = datetime('now', '-200 days') WHERE id = ?1"
772 ))
773 .bind(edge_id)
774 .execute(store.pool())
775 .await
776 .unwrap();
777
778 let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
779 assert_eq!(stats.expired_edges_deleted, 1);
780 }
781
782 #[tokio::test]
783 async fn test_eviction_orphan_entities() {
784 let store = setup().await;
785
786 let iso = store
787 .upsert_entity("Orphan", "Orphan", EntityType::Concept, None)
788 .await
789 .unwrap();
790
791 zeph_db::query(sql!(
793 "UPDATE graph_entities SET last_seen_at = datetime('now', '-200 days') WHERE id = ?1"
794 ))
795 .bind(iso)
796 .execute(store.pool())
797 .await
798 .unwrap();
799
800 let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
801 assert_eq!(stats.orphan_entities_deleted, 1);
802 }
803
804 #[tokio::test]
805 async fn test_eviction_entity_cap() {
806 let store = setup().await;
807
808 for i in 0..5_i64 {
810 let name = format!("cap_entity_{i}");
811 store
812 .upsert_entity(&name, &name, EntityType::Concept, None)
813 .await
814 .unwrap();
815 }
816
817 let stats = run_graph_eviction(&store, 90, 3).await.unwrap();
818 assert_eq!(
819 stats.capped_entities_deleted, 2,
820 "should delete 5-3=2 entities"
821 );
822 assert_eq!(store.entity_count().await.unwrap(), 3);
823 }
824
825 #[tokio::test]
826 async fn test_assign_to_community_no_neighbors() {
827 let store = setup().await;
828 let entity_id = store
829 .upsert_entity("Loner", "Loner", EntityType::Concept, None)
830 .await
831 .unwrap();
832
833 let result = assign_to_community(&store, entity_id).await.unwrap();
834 assert!(result.is_none());
835 }
836
837 #[tokio::test]
838 async fn test_extraction_count_persistence() {
839 use tempfile::NamedTempFile;
840 let tmp = NamedTempFile::new().unwrap();
842 let path = tmp.path().to_str().unwrap().to_owned();
843
844 let store1 = {
845 let s = crate::store::SqliteStore::new(&path).await.unwrap();
846 GraphStore::new(s.pool().clone())
847 };
848
849 store1.set_metadata("extraction_count", "0").await.unwrap();
850 for i in 1..=5_i64 {
851 store1
852 .set_metadata("extraction_count", &i.to_string())
853 .await
854 .unwrap();
855 }
856
857 let store2 = {
859 let s = crate::store::SqliteStore::new(&path).await.unwrap();
860 GraphStore::new(s.pool().clone())
861 };
862 assert_eq!(store2.extraction_count().await.unwrap(), 5);
863 }
864
865 #[test]
866 fn test_scrub_content_ascii_control() {
867 let input = "hello\nworld\r\x00\x01\x09end";
869 assert_eq!(scrub_content(input), "helloworldend");
870 }
871
872 #[test]
873 fn test_scrub_content_bidi_overrides() {
874 let input = "safe\u{202A}inject\u{202E}end\u{2066}iso\u{2069}done".to_string();
877 assert_eq!(scrub_content(&input), "safeinjectendisodone");
878 }
879
880 #[test]
881 fn test_scrub_content_zero_width() {
882 let input = "a\u{200B}b\u{200C}c\u{200D}d\u{200F}e".to_string();
885 assert_eq!(scrub_content(&input), "abcde");
886 }
887
888 #[test]
889 fn test_scrub_content_bom() {
890 let input = "\u{FEFF}hello".to_string();
892 assert_eq!(scrub_content(&input), "hello");
893 }
894
895 #[test]
896 fn test_scrub_content_clean_string_unchanged() {
897 let input = "Hello, World! 123 — normal text.";
898 assert_eq!(scrub_content(input), input);
899 }
900
901 #[test]
902 fn test_truncate_prompt_within_limit() {
903 let result = truncate_prompt("short".into(), 100);
904 assert_eq!(result, "short");
905 }
906
907 #[test]
908 fn test_truncate_prompt_zero_max_bytes() {
909 let result = truncate_prompt("hello".into(), 0);
910 assert_eq!(result, "");
911 }
912
913 #[test]
914 fn test_truncate_prompt_long_facts() {
915 let facts: Vec<String> = (0..20)
916 .map(|i| format!("fact_{i}_{}", "x".repeat(20)))
917 .collect();
918 let prompt = facts.join("\n");
919 let result = truncate_prompt(prompt, 200);
920 assert!(
921 result.ends_with("..."),
922 "truncated prompt must end with '...'"
923 );
924 assert!(result.len() <= 203);
926 assert!(std::str::from_utf8(result.as_bytes()).is_ok());
927 }
928
929 #[test]
930 fn test_truncate_prompt_utf8_boundary() {
931 let prompt = "🔥".repeat(100);
933 let result = truncate_prompt(prompt, 10);
934 assert!(
935 result.ends_with("..."),
936 "truncated prompt must end with '...'"
937 );
938 assert_eq!(result.len(), 8 + 3, "2 emojis (8 bytes) + '...' (3 bytes)");
940 assert!(std::str::from_utf8(result.as_bytes()).is_ok());
941 }
942
943 #[tokio::test]
944 async fn test_assign_to_community_majority_vote() {
945 let store = setup().await;
946
947 let a = store
949 .upsert_entity("AA", "AA", EntityType::Concept, None)
950 .await
951 .unwrap();
952 let b = store
953 .upsert_entity("BB", "BB", EntityType::Concept, None)
954 .await
955 .unwrap();
956 let d = store
957 .upsert_entity("DD", "DD", EntityType::Concept, None)
958 .await
959 .unwrap();
960
961 store
962 .upsert_community("test_community", "summary", &[a, b], None)
963 .await
964 .unwrap();
965
966 store.insert_edge(d, a, "r", "f", 1.0, None).await.unwrap();
967 store.insert_edge(d, b, "r", "f", 1.0, None).await.unwrap();
968
969 let result = assign_to_community(&store, d).await.unwrap();
970 assert!(result.is_some());
971
972 let returned_id = result.unwrap();
974 let community = store
975 .find_community_by_id(returned_id)
976 .await
977 .unwrap()
978 .expect("returned community_id must reference an existing row");
979 assert!(
980 community.entity_ids.contains(&d),
981 "D should be added to the community"
982 );
983 assert!(
985 community.fingerprint.is_none(),
986 "fingerprint must be cleared after assign_to_community"
987 );
988 }
989
990 #[tokio::test]
992 async fn test_incremental_detection_no_changes_skips_llm() {
993 let store = setup().await;
994 let (provider, call_buf) = recording_provider();
995
996 let a = store
997 .upsert_entity("X", "X", EntityType::Concept, None)
998 .await
999 .unwrap();
1000 let b = store
1001 .upsert_entity("Y", "Y", EntityType::Concept, None)
1002 .await
1003 .unwrap();
1004 store
1005 .insert_edge(a, b, "r", "X relates Y", 1.0, None)
1006 .await
1007 .unwrap();
1008
1009 detect_communities(&store, &provider, usize::MAX, 4, 0)
1011 .await
1012 .unwrap();
1013 let first_calls = call_buf.lock().unwrap().len();
1014 assert_eq!(first_calls, 1, "first run must produce exactly 1 LLM call");
1015
1016 detect_communities(&store, &provider, usize::MAX, 4, 0)
1018 .await
1019 .unwrap();
1020 let second_calls = call_buf.lock().unwrap().len();
1021 assert_eq!(
1022 second_calls, first_calls,
1023 "second run with no graph changes must produce 0 additional LLM calls"
1024 );
1025 }
1026
1027 #[tokio::test]
1029 async fn test_incremental_detection_edge_change_triggers_resummary() {
1030 let store = setup().await;
1031 let (provider, call_buf) = recording_provider();
1032
1033 let a = store
1034 .upsert_entity("P", "P", EntityType::Concept, None)
1035 .await
1036 .unwrap();
1037 let b = store
1038 .upsert_entity("Q", "Q", EntityType::Concept, None)
1039 .await
1040 .unwrap();
1041 store
1042 .insert_edge(a, b, "r", "P relates Q", 1.0, None)
1043 .await
1044 .unwrap();
1045
1046 detect_communities(&store, &provider, usize::MAX, 4, 0)
1047 .await
1048 .unwrap();
1049 let after_first = call_buf.lock().unwrap().len();
1050 assert_eq!(after_first, 1);
1051
1052 store
1054 .insert_edge(b, a, "r2", "Q also relates P", 1.0, None)
1055 .await
1056 .unwrap();
1057
1058 detect_communities(&store, &provider, usize::MAX, 4, 0)
1059 .await
1060 .unwrap();
1061 let after_second = call_buf.lock().unwrap().len();
1062 assert_eq!(
1063 after_second, 2,
1064 "edge change must trigger one additional LLM call"
1065 );
1066 }
1067
1068 #[tokio::test]
1070 async fn test_incremental_detection_dissolved_community_deleted() {
1071 let store = setup().await;
1072 let provider = mock_provider();
1073
1074 let a = store
1075 .upsert_entity("M1", "M1", EntityType::Concept, None)
1076 .await
1077 .unwrap();
1078 let b = store
1079 .upsert_entity("M2", "M2", EntityType::Concept, None)
1080 .await
1081 .unwrap();
1082 let edge_id = store
1083 .insert_edge(a, b, "r", "M1 relates M2", 1.0, None)
1084 .await
1085 .unwrap();
1086
1087 detect_communities(&store, &provider, usize::MAX, 4, 0)
1088 .await
1089 .unwrap();
1090 assert_eq!(store.community_count().await.unwrap(), 1);
1091
1092 store.invalidate_edge(edge_id).await.unwrap();
1094
1095 detect_communities(&store, &provider, usize::MAX, 4, 0)
1096 .await
1097 .unwrap();
1098 assert_eq!(
1099 store.community_count().await.unwrap(),
1100 0,
1101 "dissolved community must be deleted on next refresh"
1102 );
1103 }
1104
1105 #[tokio::test]
1107 async fn test_detect_communities_concurrency_one() {
1108 let store = setup().await;
1109 let provider = mock_provider();
1110
1111 let a = store
1112 .upsert_entity("C1A", "C1A", EntityType::Concept, None)
1113 .await
1114 .unwrap();
1115 let b = store
1116 .upsert_entity("C1B", "C1B", EntityType::Concept, None)
1117 .await
1118 .unwrap();
1119 store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
1120
1121 let count = detect_communities(&store, &provider, usize::MAX, 1, 0)
1122 .await
1123 .unwrap();
1124 assert_eq!(count, 1, "concurrency=1 must still detect the community");
1125 assert_eq!(store.community_count().await.unwrap(), 1);
1126 }
1127
1128 #[test]
1129 fn test_compute_fingerprint_deterministic() {
1130 let fp1 = compute_partition_fingerprint(&[1, 2, 3], &[10, 20]);
1131 let fp2 = compute_partition_fingerprint(&[3, 1, 2], &[20, 10]);
1132 assert_eq!(fp1, fp2, "fingerprint must be order-independent");
1133
1134 let fp3 = compute_partition_fingerprint(&[1, 2, 3], &[10, 30]);
1135 assert_ne!(
1136 fp1, fp3,
1137 "different edge IDs must produce different fingerprint"
1138 );
1139
1140 let fp4 = compute_partition_fingerprint(&[1, 2, 4], &[10, 20]);
1141 assert_ne!(
1142 fp1, fp4,
1143 "different entity IDs must produce different fingerprint"
1144 );
1145 }
1146
1147 #[test]
1152 fn test_compute_fingerprint_domain_separation() {
1153 let fp_a = compute_partition_fingerprint(&[1, 2], &[3]);
1154 let fp_b = compute_partition_fingerprint(&[1], &[2, 3]);
1155 assert_ne!(
1156 fp_a, fp_b,
1157 "entity/edge sequences with same raw bytes must produce different fingerprints"
1158 );
1159 }
1160
1161 #[tokio::test]
1167 async fn test_detect_communities_chunked_correct_membership() {
1168 let store = setup().await;
1169 let provider = mock_provider();
1170
1171 let node_alpha = store
1173 .upsert_entity("CA", "CA", EntityType::Concept, None)
1174 .await
1175 .unwrap();
1176 let node_beta = store
1177 .upsert_entity("CB", "CB", EntityType::Concept, None)
1178 .await
1179 .unwrap();
1180 let node_gamma = store
1181 .upsert_entity("CC", "CC", EntityType::Concept, None)
1182 .await
1183 .unwrap();
1184 let node_delta = store
1185 .upsert_entity("CD", "CD", EntityType::Concept, None)
1186 .await
1187 .unwrap();
1188 let node_epsilon = store
1189 .upsert_entity("CE", "CE", EntityType::Concept, None)
1190 .await
1191 .unwrap();
1192
1193 store
1194 .insert_edge(node_alpha, node_beta, "r", "A-B fact", 1.0, None)
1195 .await
1196 .unwrap();
1197 store
1198 .insert_edge(node_beta, node_gamma, "r", "B-C fact", 1.0, None)
1199 .await
1200 .unwrap();
1201 store
1202 .insert_edge(node_delta, node_epsilon, "r", "D-E fact", 1.0, None)
1203 .await
1204 .unwrap();
1205
1206 let count_chunked = detect_communities(&store, &provider, usize::MAX, 4, 1)
1208 .await
1209 .unwrap();
1210 assert_eq!(
1211 count_chunked, 2,
1212 "chunked loading must detect both communities"
1213 );
1214
1215 let communities = store.all_communities().await.unwrap();
1217 assert_eq!(communities.len(), 2);
1218
1219 let abc_ids = [node_alpha, node_beta, node_gamma];
1220 let de_ids = [node_delta, node_epsilon];
1221 let has_abc = communities
1222 .iter()
1223 .any(|comm| abc_ids.iter().all(|id| comm.entity_ids.contains(id)));
1224 let has_de = communities
1225 .iter()
1226 .any(|comm| de_ids.iter().all(|id| comm.entity_ids.contains(id)));
1227 assert!(has_abc, "cluster A-B-C must form a community");
1228 assert!(has_de, "cluster D-E must form a community");
1229 }
1230
1231 #[tokio::test]
1233 async fn test_detect_communities_chunk_size_max() {
1234 let store = setup().await;
1235 let provider = mock_provider();
1236
1237 let x = store
1238 .upsert_entity("MX", "MX", EntityType::Concept, None)
1239 .await
1240 .unwrap();
1241 let y = store
1242 .upsert_entity("MY", "MY", EntityType::Concept, None)
1243 .await
1244 .unwrap();
1245 store
1246 .insert_edge(x, y, "r", "X-Y fact", 1.0, None)
1247 .await
1248 .unwrap();
1249
1250 let count = detect_communities(&store, &provider, usize::MAX, 4, usize::MAX)
1251 .await
1252 .unwrap();
1253 assert_eq!(count, 1, "chunk_size=usize::MAX must detect the community");
1254 }
1255
1256 #[tokio::test]
1258 async fn test_detect_communities_chunk_size_zero_fallback() {
1259 let store = setup().await;
1260 let provider = mock_provider();
1261
1262 let p = store
1263 .upsert_entity("ZP", "ZP", EntityType::Concept, None)
1264 .await
1265 .unwrap();
1266 let q = store
1267 .upsert_entity("ZQ", "ZQ", EntityType::Concept, None)
1268 .await
1269 .unwrap();
1270 store
1271 .insert_edge(p, q, "r", "P-Q fact", 1.0, None)
1272 .await
1273 .unwrap();
1274
1275 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
1276 .await
1277 .unwrap();
1278 assert_eq!(
1279 count, 1,
1280 "chunk_size=0 must detect the community via stream fallback"
1281 );
1282 }
1283
1284 #[tokio::test]
1288 async fn test_detect_communities_chunked_edge_map_complete() {
1289 let store = setup().await;
1290 let (provider, call_buf) = recording_provider();
1291
1292 let a = store
1293 .upsert_entity("FA", "FA", EntityType::Concept, None)
1294 .await
1295 .unwrap();
1296 let b = store
1297 .upsert_entity("FB", "FB", EntityType::Concept, None)
1298 .await
1299 .unwrap();
1300 store
1301 .insert_edge(a, b, "r", "edge1 fact", 1.0, None)
1302 .await
1303 .unwrap();
1304
1305 detect_communities(&store, &provider, usize::MAX, 4, 1)
1307 .await
1308 .unwrap();
1309 let calls_after_first = call_buf.lock().unwrap().len();
1310 assert_eq!(calls_after_first, 1, "first run must trigger 1 LLM call");
1311
1312 store
1314 .insert_edge(b, a, "r2", "edge2 fact", 1.0, None)
1315 .await
1316 .unwrap();
1317
1318 detect_communities(&store, &provider, usize::MAX, 4, 1)
1319 .await
1320 .unwrap();
1321 let calls_after_second = call_buf.lock().unwrap().len();
1322 assert_eq!(
1323 calls_after_second, 2,
1324 "adding an edge must change fingerprint and trigger re-summarization"
1325 );
1326 }
1327}