1use std::collections::HashMap;
5use std::sync::Arc;
6#[allow(unused_imports)]
7use zeph_db::sql;
8
9use futures::TryStreamExt as _;
10use petgraph::Graph;
11use petgraph::graph::NodeIndex;
12use tokio::sync::Semaphore;
13use tokio::task::JoinSet;
14use zeph_llm::LlmProvider as _;
15use zeph_llm::any::AnyProvider;
16use zeph_llm::provider::{Message, Role};
17
18use crate::error::MemoryError;
19
20use super::store::GraphStore;
21use super::types::Entity;
22
23const MAX_LABEL_PROPAGATION_ITERATIONS: usize = 50;
24
25fn scrub_content(s: &str) -> String {
34 s.chars()
35 .filter(|c| {
36 !c.is_control()
37 && !matches!(*c as u32,
38 0x200B..=0x200F | 0x202A..=0x202E | 0x2066..=0x2069 | 0xFEFF
39 )
40 })
41 .collect()
42}
43
44#[derive(Debug, Default)]
46pub struct GraphEvictionStats {
47 pub expired_edges_deleted: usize,
48 pub orphan_entities_deleted: usize,
49 pub capped_entities_deleted: usize,
50}
51
52fn truncate_prompt(prompt: String, max_bytes: usize) -> String {
58 if max_bytes == 0 {
59 return String::new();
60 }
61 if prompt.len() <= max_bytes {
62 return prompt;
63 }
64 let boundary = prompt.floor_char_boundary(max_bytes);
65 format!("{}...", &prompt[..boundary])
66}
67
68fn compute_partition_fingerprint(entity_ids: &[i64], intra_edge_ids: &[i64]) -> String {
74 let mut hasher = blake3::Hasher::new();
75 let mut sorted_entities = entity_ids.to_vec();
76 sorted_entities.sort_unstable();
77 hasher.update(b"entities");
78 for id in &sorted_entities {
79 hasher.update(&id.to_le_bytes());
80 }
81 let mut sorted_edges = intra_edge_ids.to_vec();
82 sorted_edges.sort_unstable();
83 hasher.update(b"edges");
84 for id in &sorted_edges {
85 hasher.update(&id.to_le_bytes());
86 }
87 hasher.finalize().to_hex().to_string()
88}
89
90struct CommunityData {
92 entity_ids: Vec<i64>,
93 entity_names: Vec<String>,
94 intra_facts: Vec<String>,
95 fingerprint: String,
96 name: String,
97}
98
99type UndirectedGraph = Graph<i64, (), petgraph::Undirected>;
100
101async fn build_entity_graph_and_maps(
102 store: &GraphStore,
103 entities: &[Entity],
104 edge_chunk_size: usize,
105) -> Result<
106 (
107 UndirectedGraph,
108 HashMap<(i64, i64), Vec<String>>,
109 HashMap<(i64, i64), Vec<i64>>,
110 ),
111 MemoryError,
112> {
113 let mut graph = UndirectedGraph::new_undirected();
114 let mut node_map: HashMap<i64, NodeIndex> = HashMap::new();
115
116 for entity in entities {
117 let idx = graph.add_node(entity.id.0);
118 node_map.insert(entity.id.0, idx);
119 }
120
121 let mut edge_facts_map: HashMap<(i64, i64), Vec<String>> = HashMap::new();
122 let mut edge_id_map: HashMap<(i64, i64), Vec<i64>> = HashMap::new();
123
124 if edge_chunk_size == 0 {
125 let edges: Vec<_> = store.all_active_edges_stream().try_collect().await?;
126 for edge in &edges {
127 if let (Some(&src_idx), Some(&tgt_idx)) = (
128 node_map.get(&edge.source_entity_id),
129 node_map.get(&edge.target_entity_id),
130 ) {
131 graph.add_edge(src_idx, tgt_idx, ());
132 }
133 let key = (edge.source_entity_id, edge.target_entity_id);
134 edge_facts_map
135 .entry(key)
136 .or_default()
137 .push(edge.fact.clone());
138 edge_id_map.entry(key).or_default().push(edge.id);
139 }
140 } else {
141 let limit = i64::try_from(edge_chunk_size).unwrap_or(i64::MAX);
142 let mut last_id: i64 = 0;
143 loop {
144 let chunk = store.edges_after_id(last_id, limit).await?;
145 if chunk.is_empty() {
146 break;
147 }
148 last_id = chunk.last().expect("non-empty chunk has a last element").id;
149 for edge in &chunk {
150 if let (Some(&src_idx), Some(&tgt_idx)) = (
151 node_map.get(&edge.source_entity_id),
152 node_map.get(&edge.target_entity_id),
153 ) {
154 graph.add_edge(src_idx, tgt_idx, ());
155 }
156 let key = (edge.source_entity_id, edge.target_entity_id);
157 edge_facts_map
158 .entry(key)
159 .or_default()
160 .push(edge.fact.clone());
161 edge_id_map.entry(key).or_default().push(edge.id);
162 }
163 }
164 }
165
166 Ok((graph, edge_facts_map, edge_id_map))
167}
168
169fn run_label_propagation(graph: &UndirectedGraph) -> HashMap<usize, Vec<i64>> {
170 let mut labels: Vec<usize> = (0..graph.node_count()).collect();
171
172 for _ in 0..MAX_LABEL_PROPAGATION_ITERATIONS {
173 let mut changed = false;
174 for node_idx in graph.node_indices() {
175 let neighbors: Vec<NodeIndex> = graph.neighbors(node_idx).collect();
176 if neighbors.is_empty() {
177 continue;
178 }
179 let mut freq: HashMap<usize, usize> = HashMap::new();
180 for &nbr in &neighbors {
181 *freq.entry(labels[nbr.index()]).or_insert(0) += 1;
182 }
183 let max_count = *freq.values().max().unwrap_or(&0);
184 let best_label = freq
185 .iter()
186 .filter(|&(_, count)| *count == max_count)
187 .map(|(&label, _)| label)
188 .min()
189 .unwrap_or(labels[node_idx.index()]);
190 if labels[node_idx.index()] != best_label {
191 labels[node_idx.index()] = best_label;
192 changed = true;
193 }
194 }
195 if !changed {
196 break;
197 }
198 }
199
200 let mut communities: HashMap<usize, Vec<i64>> = HashMap::new();
201 for node_idx in graph.node_indices() {
202 let entity_id = graph[node_idx];
203 communities
204 .entry(labels[node_idx.index()])
205 .or_default()
206 .push(entity_id);
207 }
208 communities.retain(|_, members| members.len() >= 2);
209 communities
210}
211
212struct ClassifyResult {
213 to_summarize: Vec<CommunityData>,
214 unchanged_count: usize,
215 new_fingerprints: std::collections::HashSet<String>,
216}
217
218fn classify_communities(
219 communities: &HashMap<usize, Vec<i64>>,
220 edge_facts_map: &HashMap<(i64, i64), Vec<String>>,
221 edge_id_map: &HashMap<(i64, i64), Vec<i64>>,
222 entity_name_map: &HashMap<i64, &str>,
223 stored_fingerprints: &HashMap<String, i64>,
224 sorted_labels: &[usize],
225) -> ClassifyResult {
226 let mut to_summarize: Vec<CommunityData> = Vec::new();
227 let mut unchanged_count = 0usize;
228 let mut new_fingerprints: std::collections::HashSet<String> = std::collections::HashSet::new();
229
230 for (label_index, &label) in sorted_labels.iter().enumerate() {
231 let entity_ids = communities[&label].as_slice();
232 let member_set: std::collections::HashSet<i64> = entity_ids.iter().copied().collect();
233
234 let mut intra_facts: Vec<String> = Vec::new();
235 let mut intra_edge_ids: Vec<i64> = Vec::new();
236 for (&(src, tgt), facts) in edge_facts_map {
237 if member_set.contains(&src) && member_set.contains(&tgt) {
238 intra_facts.extend(facts.iter().map(|f| scrub_content(f)));
239 if let Some(ids) = edge_id_map.get(&(src, tgt)) {
240 intra_edge_ids.extend_from_slice(ids);
241 }
242 }
243 }
244
245 let fingerprint = compute_partition_fingerprint(entity_ids, &intra_edge_ids);
246 new_fingerprints.insert(fingerprint.clone());
247
248 if stored_fingerprints.contains_key(&fingerprint) {
249 unchanged_count += 1;
250 continue;
251 }
252
253 let entity_names: Vec<String> = entity_ids
254 .iter()
255 .filter_map(|id| entity_name_map.get(id).map(|&s| scrub_content(s)))
256 .collect();
257
258 let base_name = entity_names
261 .iter()
262 .take(3)
263 .cloned()
264 .collect::<Vec<_>>()
265 .join(", ");
266 let name = format!("{base_name} [{label_index}]");
267
268 to_summarize.push(CommunityData {
269 entity_ids: entity_ids.to_vec(),
270 entity_names,
271 intra_facts,
272 fingerprint,
273 name,
274 });
275 }
276
277 ClassifyResult {
278 to_summarize,
279 unchanged_count,
280 new_fingerprints,
281 }
282}
283
284async fn summarize_and_upsert_communities(
285 store: &GraphStore,
286 provider: &AnyProvider,
287 to_summarize: Vec<CommunityData>,
288 concurrency: usize,
289 community_summary_max_prompt_bytes: usize,
290) -> Result<usize, MemoryError> {
291 let semaphore = Arc::new(Semaphore::new(concurrency.max(1)));
292 let mut join_set: JoinSet<(String, String, Vec<i64>, String)> = JoinSet::new();
293
294 for data in to_summarize {
295 let provider = provider.clone();
296 let sem = Arc::clone(&semaphore);
297 let max_bytes = community_summary_max_prompt_bytes;
298 join_set.spawn(async move {
299 let _permit = sem.acquire().await.expect("semaphore is never closed");
300 let summary = match generate_community_summary(
301 &provider,
302 &data.entity_names,
303 &data.intra_facts,
304 max_bytes,
305 )
306 .await
307 {
308 Ok(text) => text,
309 Err(e) => {
310 tracing::warn!(community = %data.name, "community summary generation failed: {e:#}");
311 String::new()
312 }
313 };
314 (data.name, summary, data.entity_ids, data.fingerprint)
315 });
316 }
317
318 let mut results: Vec<(String, String, Vec<i64>, String)> = Vec::new();
320 while let Some(outcome) = join_set.join_next().await {
321 match outcome {
322 Ok(tuple) => results.push(tuple),
323 Err(e) => {
324 tracing::error!(
325 panicked = e.is_panic(),
326 cancelled = e.is_cancelled(),
327 "community summary task failed"
328 );
329 }
330 }
331 }
332
333 results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
334
335 let mut count = 0usize;
336 for (name, summary, entity_ids, fingerprint) in results {
337 store
338 .upsert_community(&name, &summary, &entity_ids, Some(&fingerprint))
339 .await?;
340 count += 1;
341 }
342
343 Ok(count)
344}
345
346pub async fn detect_communities(
365 store: &GraphStore,
366 provider: &AnyProvider,
367 community_summary_max_prompt_bytes: usize,
368 concurrency: usize,
369 edge_chunk_size: usize,
370) -> Result<usize, MemoryError> {
371 let edge_chunk_size = if edge_chunk_size == 0 {
372 tracing::warn!(
373 "edge_chunk_size is 0, which would load all edges into memory; \
374 using safe default of 10_000"
375 );
376 10_000_usize
377 } else {
378 edge_chunk_size
379 };
380
381 let entities = store.all_entities().await?;
382 if entities.len() < 2 {
383 return Ok(0);
384 }
385
386 let (graph, edge_facts_map, edge_id_map) =
387 build_entity_graph_and_maps(store, &entities, edge_chunk_size).await?;
388
389 let communities = run_label_propagation(&graph);
390
391 let entity_name_map: HashMap<i64, &str> =
392 entities.iter().map(|e| (e.id.0, e.name.as_str())).collect();
393 let stored_fingerprints = store.community_fingerprints().await?;
394
395 let mut sorted_labels: Vec<usize> = communities.keys().copied().collect();
396 sorted_labels.sort_unstable();
397
398 let ClassifyResult {
399 to_summarize,
400 unchanged_count,
401 new_fingerprints,
402 } = classify_communities(
403 &communities,
404 &edge_facts_map,
405 &edge_id_map,
406 &entity_name_map,
407 &stored_fingerprints,
408 &sorted_labels,
409 );
410
411 tracing::debug!(
412 total = sorted_labels.len(),
413 unchanged = unchanged_count,
414 to_summarize = to_summarize.len(),
415 "community detection: partition classification complete"
416 );
417
418 for (stored_fp, community_id) in &stored_fingerprints {
420 if !new_fingerprints.contains(stored_fp.as_str()) {
421 store.delete_community_by_id(*community_id).await?;
422 }
423 }
424
425 let new_count = summarize_and_upsert_communities(
426 store,
427 provider,
428 to_summarize,
429 concurrency,
430 community_summary_max_prompt_bytes,
431 )
432 .await?;
433
434 Ok(unchanged_count + new_count)
435}
436
437pub async fn assign_to_community(
448 store: &GraphStore,
449 entity_id: i64,
450) -> Result<Option<i64>, MemoryError> {
451 let edges = store.edges_for_entity(entity_id).await?;
452 if edges.is_empty() {
453 return Ok(None);
454 }
455
456 let neighbor_ids: Vec<i64> = edges
457 .iter()
458 .map(|e| {
459 if e.source_entity_id == entity_id {
460 e.target_entity_id
461 } else {
462 e.source_entity_id
463 }
464 })
465 .collect();
466
467 let mut community_votes: HashMap<i64, usize> = HashMap::new();
468 for &nbr_id in &neighbor_ids {
469 if let Some(community) = store.community_for_entity(nbr_id).await? {
470 *community_votes.entry(community.id).or_insert(0) += 1;
471 }
472 }
473
474 if community_votes.is_empty() {
475 return Ok(None);
476 }
477
478 let Some((&best_community_id, _)) =
481 community_votes
482 .iter()
483 .max_by(|&(&id_a, &count_a), &(&id_b, &count_b)| {
484 count_a.cmp(&count_b).then(id_b.cmp(&id_a))
485 })
486 else {
487 return Ok(None);
488 };
489
490 if let Some(mut target) = store.find_community_by_id(best_community_id).await? {
491 if !target.entity_ids.iter().any(|eid| eid.0 == entity_id) {
492 target.entity_ids.push(crate::types::EntityId(entity_id));
493 let raw_ids: Vec<i64> = target.entity_ids.iter().map(|eid| eid.0).collect();
494 store
495 .upsert_community(&target.name, &target.summary, &raw_ids, None)
496 .await?;
497 store.clear_community_fingerprint(best_community_id).await?;
499 }
500 return Ok(Some(best_community_id));
501 }
502
503 Ok(None)
504}
505
506pub async fn cleanup_stale_entity_embeddings(
514 store: &GraphStore,
515 embeddings: &crate::embedding_store::EmbeddingStore,
516) -> Result<usize, MemoryError> {
517 const ENTITY_COLLECTION: &str = "zeph_graph_entities";
518
519 let pairs = embeddings.scroll_all_entity_ids(ENTITY_COLLECTION).await?;
523 if pairs.is_empty() {
524 return Ok(0);
525 }
526
527 let qdrant_ids: Vec<i64> = pairs.iter().map(|(_, eid)| *eid).collect();
528 let live: std::collections::HashSet<i64> = store
529 .entity_ids_in(&qdrant_ids)
530 .await?
531 .into_iter()
532 .collect();
533
534 let stale_point_ids: Vec<String> = pairs
535 .into_iter()
536 .filter_map(|(pid, eid)| (!live.contains(&eid)).then_some(pid))
537 .collect();
538
539 if stale_point_ids.is_empty() {
540 return Ok(0);
541 }
542
543 let count = stale_point_ids.len();
544 embeddings
545 .delete_from_collection(ENTITY_COLLECTION, stale_point_ids)
546 .await?;
547 Ok(count)
548}
549
550pub async fn run_graph_eviction(
556 store: &GraphStore,
557 expired_edge_retention_days: u32,
558 max_entities: usize,
559) -> Result<GraphEvictionStats, MemoryError> {
560 let expired_edges_deleted = store
561 .delete_expired_edges(expired_edge_retention_days)
562 .await?;
563 let orphan_entities_deleted = store
564 .delete_orphan_entities(expired_edge_retention_days)
565 .await?;
566 let capped_entities_deleted = if max_entities > 0 {
567 store.cap_entities(max_entities).await?
568 } else {
569 0
570 };
571
572 Ok(GraphEvictionStats {
573 expired_edges_deleted,
574 orphan_entities_deleted,
575 capped_entities_deleted,
576 })
577}
578
579async fn generate_community_summary(
580 provider: &AnyProvider,
581 entity_names: &[String],
582 edge_facts: &[String],
583 max_prompt_bytes: usize,
584) -> Result<String, MemoryError> {
585 let entities_str = entity_names.join(", ");
586 let facts_str = edge_facts
588 .iter()
589 .take(20)
590 .map(|f| format!("- {f}"))
591 .collect::<Vec<_>>()
592 .join("\n");
593
594 let raw_prompt = format!(
595 "Summarize the following group of related entities and their relationships \
596 into a single paragraph (2-3 sentences). Focus on the theme that connects \
597 them and the key relationships.\n\nEntities: {entities_str}\n\
598 Relationships:\n{facts_str}\n\nSummary:"
599 );
600
601 let original_bytes = raw_prompt.len();
602 let truncated = raw_prompt.len() > max_prompt_bytes;
603 let prompt = truncate_prompt(raw_prompt, max_prompt_bytes);
604 if prompt.is_empty() {
605 return Ok(String::new());
606 }
607 if truncated {
608 tracing::warn!(
609 entity_count = entity_names.len(),
610 original_bytes,
611 truncated_bytes = prompt.len(),
612 "community summary prompt truncated"
613 );
614 }
615
616 let messages = [Message::from_legacy(Role::User, prompt)];
617 let response: String = provider.chat(&messages).await.map_err(MemoryError::Llm)?;
618 Ok(response)
619}
620
621#[cfg(test)]
622mod tests {
623 use std::sync::{Arc, Mutex};
624
625 use super::*;
626 use crate::graph::types::EntityType;
627 use crate::store::SqliteStore;
628
629 async fn setup() -> GraphStore {
630 let store = SqliteStore::new(":memory:").await.unwrap();
631 GraphStore::new(store.pool().clone())
632 }
633
634 fn mock_provider() -> AnyProvider {
635 AnyProvider::Mock(zeph_llm::mock::MockProvider::default())
636 }
637
638 fn recording_provider() -> (
639 AnyProvider,
640 Arc<Mutex<Vec<Vec<zeph_llm::provider::Message>>>>,
641 ) {
642 let (mock, buf) = zeph_llm::mock::MockProvider::default().with_recording();
643 (AnyProvider::Mock(mock), buf)
644 }
645
646 #[tokio::test]
647 async fn test_detect_communities_empty_graph() {
648 let store = setup().await;
649 let provider = mock_provider();
650 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
651 .await
652 .unwrap();
653 assert_eq!(count, 0);
654 }
655
656 #[tokio::test]
657 async fn test_detect_communities_single_entity() {
658 let store = setup().await;
659 let provider = mock_provider();
660 store
661 .upsert_entity("Solo", "Solo", EntityType::Concept, None)
662 .await
663 .unwrap();
664 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
665 .await
666 .unwrap();
667 assert_eq!(count, 0, "single isolated entity must not form a community");
668 }
669
670 #[tokio::test]
671 async fn test_single_entity_community_filtered() {
672 let store = setup().await;
673 let provider = mock_provider();
674
675 let a = store
677 .upsert_entity("A", "A", EntityType::Concept, None)
678 .await
679 .unwrap()
680 .0;
681 let b = store
682 .upsert_entity("B", "B", EntityType::Concept, None)
683 .await
684 .unwrap()
685 .0;
686 let c = store
687 .upsert_entity("C", "C", EntityType::Concept, None)
688 .await
689 .unwrap()
690 .0;
691 let iso = store
692 .upsert_entity("Isolated", "Isolated", EntityType::Concept, None)
693 .await
694 .unwrap()
695 .0;
696
697 store
698 .insert_edge(a, b, "r", "A relates B", 1.0, None)
699 .await
700 .unwrap();
701 store
702 .insert_edge(b, c, "r", "B relates C", 1.0, None)
703 .await
704 .unwrap();
705
706 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
707 .await
708 .unwrap();
709 assert_eq!(count, 1, "only the 3-entity cluster should be detected");
711
712 let communities = store.all_communities().await.unwrap();
713 assert_eq!(communities.len(), 1);
714 assert!(
715 !communities[0].entity_ids.iter().any(|eid| eid.0 == iso),
716 "isolated entity must not be in any community"
717 );
718 }
719
720 #[tokio::test]
721 async fn test_label_propagation_basic() {
722 let store = setup().await;
723 let provider = mock_provider();
724
725 let mut cluster_ids: Vec<Vec<i64>> = Vec::new();
727 for cluster in 0..4_i64 {
728 let mut ids = Vec::new();
729 for node in 0..3_i64 {
730 let name = format!("c{cluster}_n{node}");
731 let id = store
732 .upsert_entity(&name, &name, EntityType::Concept, None)
733 .await
734 .unwrap()
735 .0;
736 ids.push(id);
737 }
738 store
740 .insert_edge(ids[0], ids[1], "r", "f", 1.0, None)
741 .await
742 .unwrap();
743 store
744 .insert_edge(ids[1], ids[2], "r", "f", 1.0, None)
745 .await
746 .unwrap();
747 cluster_ids.push(ids);
748 }
749
750 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
751 .await
752 .unwrap();
753 assert_eq!(count, 4, "expected 4 communities, one per cluster");
754
755 let communities = store.all_communities().await.unwrap();
756 assert_eq!(communities.len(), 4);
757
758 for ids in &cluster_ids {
760 let found = communities
761 .iter()
762 .filter(|c| {
763 ids.iter()
764 .any(|id| c.entity_ids.iter().any(|eid| eid.0 == *id))
765 })
766 .count();
767 assert_eq!(
768 found, 1,
769 "all nodes of a cluster must be in the same community"
770 );
771 }
772 }
773
774 #[tokio::test]
775 async fn test_all_isolated_nodes() {
776 let store = setup().await;
777 let provider = mock_provider();
778
779 for i in 0..5_i64 {
781 store
782 .upsert_entity(
783 &format!("iso_{i}"),
784 &format!("iso_{i}"),
785 EntityType::Concept,
786 None,
787 )
788 .await
789 .unwrap();
790 }
791
792 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
793 .await
794 .unwrap();
795 assert_eq!(count, 0, "zero-edge graph must produce no communities");
796 assert_eq!(store.community_count().await.unwrap(), 0);
797 }
798
799 #[tokio::test]
800 async fn test_eviction_expired_edges() {
801 let store = setup().await;
802
803 let a = store
804 .upsert_entity("EA", "EA", EntityType::Concept, None)
805 .await
806 .unwrap()
807 .0;
808 let b = store
809 .upsert_entity("EB", "EB", EntityType::Concept, None)
810 .await
811 .unwrap()
812 .0;
813 let edge_id = store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
814 store.invalidate_edge(edge_id).await.unwrap();
815
816 zeph_db::query(sql!(
818 "UPDATE graph_edges SET expired_at = datetime('now', '-200 days') WHERE id = ?1"
819 ))
820 .bind(edge_id)
821 .execute(store.pool())
822 .await
823 .unwrap();
824
825 let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
826 assert_eq!(stats.expired_edges_deleted, 1);
827 }
828
829 #[tokio::test]
830 async fn test_eviction_orphan_entities() {
831 let store = setup().await;
832
833 let iso = store
834 .upsert_entity("Orphan", "Orphan", EntityType::Concept, None)
835 .await
836 .unwrap()
837 .0;
838
839 zeph_db::query(sql!(
841 "UPDATE graph_entities SET last_seen_at = datetime('now', '-200 days') WHERE id = ?1"
842 ))
843 .bind(iso)
844 .execute(store.pool())
845 .await
846 .unwrap();
847
848 let stats = run_graph_eviction(&store, 90, 0).await.unwrap();
849 assert_eq!(stats.orphan_entities_deleted, 1);
850 }
851
852 #[tokio::test]
853 async fn test_eviction_entity_cap() {
854 let store = setup().await;
855
856 for i in 0..5_i64 {
858 let name = format!("cap_entity_{i}");
859 store
860 .upsert_entity(&name, &name, EntityType::Concept, None)
861 .await
862 .unwrap();
863 }
864
865 let stats = run_graph_eviction(&store, 90, 3).await.unwrap();
866 assert_eq!(
867 stats.capped_entities_deleted, 2,
868 "should delete 5-3=2 entities"
869 );
870 assert_eq!(store.entity_count().await.unwrap(), 3);
871 }
872
873 #[tokio::test]
874 async fn test_assign_to_community_no_neighbors() {
875 let store = setup().await;
876 let entity_id = store
877 .upsert_entity("Loner", "Loner", EntityType::Concept, None)
878 .await
879 .unwrap()
880 .0;
881
882 let result = assign_to_community(&store, entity_id).await.unwrap();
883 assert!(result.is_none());
884 }
885
886 #[tokio::test]
887 async fn test_extraction_count_persistence() {
888 use tempfile::NamedTempFile;
889 let tmp = NamedTempFile::new().unwrap();
891 let path = tmp.path().to_str().unwrap().to_owned();
892
893 let store1 = {
894 let s = crate::store::SqliteStore::new(&path).await.unwrap();
895 GraphStore::new(s.pool().clone())
896 };
897
898 store1.set_metadata("extraction_count", "0").await.unwrap();
899 for i in 1..=5_i64 {
900 store1
901 .set_metadata("extraction_count", &i.to_string())
902 .await
903 .unwrap();
904 }
905
906 let store2 = {
908 let s = crate::store::SqliteStore::new(&path).await.unwrap();
909 GraphStore::new(s.pool().clone())
910 };
911 assert_eq!(store2.extraction_count().await.unwrap(), 5);
912 }
913
914 #[test]
915 fn test_scrub_content_ascii_control() {
916 let input = "hello\nworld\r\x00\x01\x09end";
918 assert_eq!(scrub_content(input), "helloworldend");
919 }
920
921 #[test]
922 fn test_scrub_content_bidi_overrides() {
923 let input = "safe\u{202A}inject\u{202E}end\u{2066}iso\u{2069}done".to_string();
926 assert_eq!(scrub_content(&input), "safeinjectendisodone");
927 }
928
929 #[test]
930 fn test_scrub_content_zero_width() {
931 let input = "a\u{200B}b\u{200C}c\u{200D}d\u{200F}e".to_string();
934 assert_eq!(scrub_content(&input), "abcde");
935 }
936
937 #[test]
938 fn test_scrub_content_bom() {
939 let input = "\u{FEFF}hello".to_string();
941 assert_eq!(scrub_content(&input), "hello");
942 }
943
944 #[test]
945 fn test_scrub_content_clean_string_unchanged() {
946 let input = "Hello, World! 123 — normal text.";
947 assert_eq!(scrub_content(input), input);
948 }
949
950 #[test]
951 fn test_truncate_prompt_within_limit() {
952 let result = truncate_prompt("short".into(), 100);
953 assert_eq!(result, "short");
954 }
955
956 #[test]
957 fn test_truncate_prompt_zero_max_bytes() {
958 let result = truncate_prompt("hello".into(), 0);
959 assert_eq!(result, "");
960 }
961
962 #[test]
963 fn test_truncate_prompt_long_facts() {
964 let facts: Vec<String> = (0..20)
965 .map(|i| format!("fact_{i}_{}", "x".repeat(20)))
966 .collect();
967 let prompt = facts.join("\n");
968 let result = truncate_prompt(prompt, 200);
969 assert!(
970 result.ends_with("..."),
971 "truncated prompt must end with '...'"
972 );
973 assert!(result.len() <= 203);
975 assert!(std::str::from_utf8(result.as_bytes()).is_ok());
976 }
977
978 #[test]
979 fn test_truncate_prompt_utf8_boundary() {
980 let prompt = "🔥".repeat(100);
982 let result = truncate_prompt(prompt, 10);
983 assert!(
984 result.ends_with("..."),
985 "truncated prompt must end with '...'"
986 );
987 assert_eq!(result.len(), 8 + 3, "2 emojis (8 bytes) + '...' (3 bytes)");
989 assert!(std::str::from_utf8(result.as_bytes()).is_ok());
990 }
991
992 #[tokio::test]
993 async fn test_assign_to_community_majority_vote() {
994 let store = setup().await;
995
996 let a = store
998 .upsert_entity("AA", "AA", EntityType::Concept, None)
999 .await
1000 .unwrap()
1001 .0;
1002 let b = store
1003 .upsert_entity("BB", "BB", EntityType::Concept, None)
1004 .await
1005 .unwrap()
1006 .0;
1007 let d = store
1008 .upsert_entity("DD", "DD", EntityType::Concept, None)
1009 .await
1010 .unwrap()
1011 .0;
1012
1013 store
1014 .upsert_community("test_community", "summary", &[a, b], None)
1015 .await
1016 .unwrap();
1017
1018 store.insert_edge(d, a, "r", "f", 1.0, None).await.unwrap();
1019 store.insert_edge(d, b, "r", "f", 1.0, None).await.unwrap();
1020
1021 let result = assign_to_community(&store, d).await.unwrap();
1022 assert!(result.is_some());
1023
1024 let returned_id = result.unwrap();
1026 let community = store
1027 .find_community_by_id(returned_id)
1028 .await
1029 .unwrap()
1030 .expect("returned community_id must reference an existing row");
1031 assert!(
1032 community.entity_ids.iter().any(|eid| eid.0 == d),
1033 "D should be added to the community"
1034 );
1035 assert!(
1037 community.fingerprint.is_none(),
1038 "fingerprint must be cleared after assign_to_community"
1039 );
1040 }
1041
1042 #[tokio::test]
1044 async fn test_incremental_detection_no_changes_skips_llm() {
1045 let store = setup().await;
1046 let (provider, call_buf) = recording_provider();
1047
1048 let a = store
1049 .upsert_entity("X", "X", EntityType::Concept, None)
1050 .await
1051 .unwrap()
1052 .0;
1053 let b = store
1054 .upsert_entity("Y", "Y", EntityType::Concept, None)
1055 .await
1056 .unwrap()
1057 .0;
1058 store
1059 .insert_edge(a, b, "r", "X relates Y", 1.0, None)
1060 .await
1061 .unwrap();
1062
1063 detect_communities(&store, &provider, usize::MAX, 4, 0)
1065 .await
1066 .unwrap();
1067 let first_calls = call_buf.lock().unwrap().len();
1068 assert_eq!(first_calls, 1, "first run must produce exactly 1 LLM call");
1069
1070 detect_communities(&store, &provider, usize::MAX, 4, 0)
1072 .await
1073 .unwrap();
1074 let second_calls = call_buf.lock().unwrap().len();
1075 assert_eq!(
1076 second_calls, first_calls,
1077 "second run with no graph changes must produce 0 additional LLM calls"
1078 );
1079 }
1080
1081 #[tokio::test]
1083 async fn test_incremental_detection_edge_change_triggers_resummary() {
1084 let store = setup().await;
1085 let (provider, call_buf) = recording_provider();
1086
1087 let a = store
1088 .upsert_entity("P", "P", EntityType::Concept, None)
1089 .await
1090 .unwrap()
1091 .0;
1092 let b = store
1093 .upsert_entity("Q", "Q", EntityType::Concept, None)
1094 .await
1095 .unwrap()
1096 .0;
1097 store
1098 .insert_edge(a, b, "r", "P relates Q", 1.0, None)
1099 .await
1100 .unwrap();
1101
1102 detect_communities(&store, &provider, usize::MAX, 4, 0)
1103 .await
1104 .unwrap();
1105 let after_first = call_buf.lock().unwrap().len();
1106 assert_eq!(after_first, 1);
1107
1108 store
1110 .insert_edge(b, a, "r2", "Q also relates P", 1.0, None)
1111 .await
1112 .unwrap();
1113
1114 detect_communities(&store, &provider, usize::MAX, 4, 0)
1115 .await
1116 .unwrap();
1117 let after_second = call_buf.lock().unwrap().len();
1118 assert_eq!(
1119 after_second, 2,
1120 "edge change must trigger one additional LLM call"
1121 );
1122 }
1123
1124 #[tokio::test]
1126 async fn test_incremental_detection_dissolved_community_deleted() {
1127 let store = setup().await;
1128 let provider = mock_provider();
1129
1130 let a = store
1131 .upsert_entity("M1", "M1", EntityType::Concept, None)
1132 .await
1133 .unwrap()
1134 .0;
1135 let b = store
1136 .upsert_entity("M2", "M2", EntityType::Concept, None)
1137 .await
1138 .unwrap()
1139 .0;
1140 let edge_id = store
1141 .insert_edge(a, b, "r", "M1 relates M2", 1.0, None)
1142 .await
1143 .unwrap();
1144
1145 detect_communities(&store, &provider, usize::MAX, 4, 0)
1146 .await
1147 .unwrap();
1148 assert_eq!(store.community_count().await.unwrap(), 1);
1149
1150 store.invalidate_edge(edge_id).await.unwrap();
1152
1153 detect_communities(&store, &provider, usize::MAX, 4, 0)
1154 .await
1155 .unwrap();
1156 assert_eq!(
1157 store.community_count().await.unwrap(),
1158 0,
1159 "dissolved community must be deleted on next refresh"
1160 );
1161 }
1162
1163 #[tokio::test]
1165 async fn test_detect_communities_concurrency_one() {
1166 let store = setup().await;
1167 let provider = mock_provider();
1168
1169 let a = store
1170 .upsert_entity("C1A", "C1A", EntityType::Concept, None)
1171 .await
1172 .unwrap()
1173 .0;
1174 let b = store
1175 .upsert_entity("C1B", "C1B", EntityType::Concept, None)
1176 .await
1177 .unwrap()
1178 .0;
1179 store.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
1180
1181 let count = detect_communities(&store, &provider, usize::MAX, 1, 0)
1182 .await
1183 .unwrap();
1184 assert_eq!(count, 1, "concurrency=1 must still detect the community");
1185 assert_eq!(store.community_count().await.unwrap(), 1);
1186 }
1187
1188 #[test]
1189 fn test_compute_fingerprint_deterministic() {
1190 let fp1 = compute_partition_fingerprint(&[1, 2, 3], &[10, 20]);
1191 let fp2 = compute_partition_fingerprint(&[3, 1, 2], &[20, 10]);
1192 assert_eq!(fp1, fp2, "fingerprint must be order-independent");
1193
1194 let fp3 = compute_partition_fingerprint(&[1, 2, 3], &[10, 30]);
1195 assert_ne!(
1196 fp1, fp3,
1197 "different edge IDs must produce different fingerprint"
1198 );
1199
1200 let fp4 = compute_partition_fingerprint(&[1, 2, 4], &[10, 20]);
1201 assert_ne!(
1202 fp1, fp4,
1203 "different entity IDs must produce different fingerprint"
1204 );
1205 }
1206
1207 #[test]
1212 fn test_compute_fingerprint_domain_separation() {
1213 let fp_a = compute_partition_fingerprint(&[1, 2], &[3]);
1214 let fp_b = compute_partition_fingerprint(&[1], &[2, 3]);
1215 assert_ne!(
1216 fp_a, fp_b,
1217 "entity/edge sequences with same raw bytes must produce different fingerprints"
1218 );
1219 }
1220
1221 #[tokio::test]
1227 async fn test_detect_communities_chunked_correct_membership() {
1228 let store = setup().await;
1229 let provider = mock_provider();
1230
1231 let node_alpha = store
1233 .upsert_entity("CA", "CA", EntityType::Concept, None)
1234 .await
1235 .unwrap()
1236 .0;
1237 let node_beta = store
1238 .upsert_entity("CB", "CB", EntityType::Concept, None)
1239 .await
1240 .unwrap()
1241 .0;
1242 let node_gamma = store
1243 .upsert_entity("CC", "CC", EntityType::Concept, None)
1244 .await
1245 .unwrap()
1246 .0;
1247 let node_delta = store
1248 .upsert_entity("CD", "CD", EntityType::Concept, None)
1249 .await
1250 .unwrap()
1251 .0;
1252 let node_epsilon = store
1253 .upsert_entity("CE", "CE", EntityType::Concept, None)
1254 .await
1255 .unwrap()
1256 .0;
1257
1258 store
1259 .insert_edge(node_alpha, node_beta, "r", "A-B fact", 1.0, None)
1260 .await
1261 .unwrap();
1262 store
1263 .insert_edge(node_beta, node_gamma, "r", "B-C fact", 1.0, None)
1264 .await
1265 .unwrap();
1266 store
1267 .insert_edge(node_delta, node_epsilon, "r", "D-E fact", 1.0, None)
1268 .await
1269 .unwrap();
1270
1271 let count_chunked = detect_communities(&store, &provider, usize::MAX, 4, 1)
1273 .await
1274 .unwrap();
1275 assert_eq!(
1276 count_chunked, 2,
1277 "chunked loading must detect both communities"
1278 );
1279
1280 let communities = store.all_communities().await.unwrap();
1282 assert_eq!(communities.len(), 2);
1283
1284 let abc_ids = [node_alpha, node_beta, node_gamma];
1285 let de_ids = [node_delta, node_epsilon];
1286 let has_abc = communities.iter().any(|comm| {
1287 abc_ids
1288 .iter()
1289 .all(|id| comm.entity_ids.iter().any(|eid| eid.0 == *id))
1290 });
1291 let has_de = communities.iter().any(|comm| {
1292 de_ids
1293 .iter()
1294 .all(|id| comm.entity_ids.iter().any(|eid| eid.0 == *id))
1295 });
1296 assert!(has_abc, "cluster A-B-C must form a community");
1297 assert!(has_de, "cluster D-E must form a community");
1298 }
1299
1300 #[tokio::test]
1302 async fn test_detect_communities_chunk_size_max() {
1303 let store = setup().await;
1304 let provider = mock_provider();
1305
1306 let x = store
1307 .upsert_entity("MX", "MX", EntityType::Concept, None)
1308 .await
1309 .unwrap()
1310 .0;
1311 let y = store
1312 .upsert_entity("MY", "MY", EntityType::Concept, None)
1313 .await
1314 .unwrap()
1315 .0;
1316 store
1317 .insert_edge(x, y, "r", "X-Y fact", 1.0, None)
1318 .await
1319 .unwrap();
1320
1321 let count = detect_communities(&store, &provider, usize::MAX, 4, usize::MAX)
1322 .await
1323 .unwrap();
1324 assert_eq!(count, 1, "chunk_size=usize::MAX must detect the community");
1325 }
1326
1327 #[tokio::test]
1329 async fn test_detect_communities_chunk_size_zero_fallback() {
1330 let store = setup().await;
1331 let provider = mock_provider();
1332
1333 let p = store
1334 .upsert_entity("ZP", "ZP", EntityType::Concept, None)
1335 .await
1336 .unwrap()
1337 .0;
1338 let q = store
1339 .upsert_entity("ZQ", "ZQ", EntityType::Concept, None)
1340 .await
1341 .unwrap()
1342 .0;
1343 store
1344 .insert_edge(p, q, "r", "P-Q fact", 1.0, None)
1345 .await
1346 .unwrap();
1347
1348 let count = detect_communities(&store, &provider, usize::MAX, 4, 0)
1349 .await
1350 .unwrap();
1351 assert_eq!(
1352 count, 1,
1353 "chunk_size=0 must detect the community via stream fallback"
1354 );
1355 }
1356
1357 #[tokio::test]
1361 async fn test_detect_communities_chunked_edge_map_complete() {
1362 let store = setup().await;
1363 let (provider, call_buf) = recording_provider();
1364
1365 let a = store
1366 .upsert_entity("FA", "FA", EntityType::Concept, None)
1367 .await
1368 .unwrap()
1369 .0;
1370 let b = store
1371 .upsert_entity("FB", "FB", EntityType::Concept, None)
1372 .await
1373 .unwrap()
1374 .0;
1375 store
1376 .insert_edge(a, b, "r", "edge1 fact", 1.0, None)
1377 .await
1378 .unwrap();
1379
1380 detect_communities(&store, &provider, usize::MAX, 4, 1)
1382 .await
1383 .unwrap();
1384 let calls_after_first = call_buf.lock().unwrap().len();
1385 assert_eq!(calls_after_first, 1, "first run must trigger 1 LLM call");
1386
1387 store
1389 .insert_edge(b, a, "r2", "edge2 fact", 1.0, None)
1390 .await
1391 .unwrap();
1392
1393 detect_communities(&store, &provider, usize::MAX, 4, 1)
1394 .await
1395 .unwrap();
1396 let calls_after_second = call_buf.lock().unwrap().len();
1397 assert_eq!(
1398 calls_after_second, 2,
1399 "adding an edge must change fingerprint and trigger re-summarization"
1400 );
1401 }
1402
1403 #[tokio::test]
1405 async fn cleanup_stale_empty_collection() {
1406 let store = setup().await;
1407 let sqlite_store = crate::store::SqliteStore::new(":memory:").await.unwrap();
1408 let pool = sqlite_store.pool().clone();
1409 let mem_store = Box::new(crate::in_memory_store::InMemoryVectorStore::new());
1410 let emb_store = crate::embedding_store::EmbeddingStore::with_store(mem_store, pool);
1411 emb_store
1412 .ensure_named_collection("zeph_graph_entities", 4)
1413 .await
1414 .unwrap();
1415
1416 let deleted = cleanup_stale_entity_embeddings(&store, &emb_store)
1417 .await
1418 .unwrap();
1419 assert_eq!(deleted, 0, "nothing to delete from empty collection");
1420 }
1421
1422 #[tokio::test]
1425 async fn cleanup_stale_deletes_orphaned_points() {
1426 use crate::graph::types::EntityType;
1427
1428 let sqlite_store = crate::store::SqliteStore::new(":memory:").await.unwrap();
1429 let pool = sqlite_store.pool().clone();
1430 let graph_store = GraphStore::new(pool.clone());
1431
1432 let mem_store = Box::new(crate::in_memory_store::InMemoryVectorStore::new());
1433 let emb_store = crate::embedding_store::EmbeddingStore::with_store(mem_store, pool.clone());
1434 emb_store
1435 .ensure_named_collection("zeph_graph_entities", 4)
1436 .await
1437 .unwrap();
1438
1439 let live_id = graph_store
1441 .upsert_entity("Live", "live", EntityType::Person, None)
1442 .await
1443 .unwrap()
1444 .0;
1445 let stale_id = graph_store
1446 .upsert_entity("Stale", "stale", EntityType::Person, None)
1447 .await
1448 .unwrap()
1449 .0;
1450
1451 let live_payload = serde_json::json!({
1453 "entity_id": live_id,
1454 "entity_id_str": live_id.to_string(),
1455 "name": "Live",
1456 });
1457 let stale_payload = serde_json::json!({
1458 "entity_id": stale_id,
1459 "entity_id_str": stale_id.to_string(),
1460 "name": "Stale",
1461 });
1462 emb_store
1463 .store_to_collection(
1464 "zeph_graph_entities",
1465 live_payload,
1466 vec![1.0, 0.0, 0.0, 0.0],
1467 )
1468 .await
1469 .unwrap();
1470 emb_store
1471 .store_to_collection(
1472 "zeph_graph_entities",
1473 stale_payload,
1474 vec![0.0, 1.0, 0.0, 0.0],
1475 )
1476 .await
1477 .unwrap();
1478
1479 zeph_db::query(zeph_db::sql!("DELETE FROM graph_entities WHERE id = ?"))
1481 .bind(stale_id)
1482 .execute(&pool)
1483 .await
1484 .unwrap();
1485
1486 let deleted = cleanup_stale_entity_embeddings(&graph_store, &emb_store)
1487 .await
1488 .unwrap();
1489 assert_eq!(deleted, 1, "exactly one stale point should be removed");
1490
1491 let remaining = emb_store
1493 .scroll_all_entity_ids("zeph_graph_entities")
1494 .await
1495 .unwrap();
1496 assert_eq!(remaining.len(), 1);
1497 assert_eq!(remaining[0].1, live_id);
1498 }
1499}