use std::collections::HashMap;
use std::sync::Arc;
use hirn_core::embed::{ChatMessage, LlmOptions, LlmProvider};
use hirn_core::error::HirnResult;
use hirn_core::id::MemoryId;
use hirn_core::metadata::Metadata;
use hirn_core::semantic::SemanticRecord;
use hirn_core::types::{AgentId, EdgeRelation, KnowledgeType, Layer, Origin};
use crate::db::HirnDB;
use crate::graph_store::GraphStore;
#[derive(Debug, Clone)]
pub struct CommunityConfig {
pub resolution: f64,
pub auto_resolution: bool,
pub max_iterations: usize,
pub max_levels: usize,
pub min_community_size: usize,
}
impl Default for CommunityConfig {
fn default() -> Self {
Self {
resolution: 1.0,
auto_resolution: true,
max_iterations: 10,
max_levels: 5,
min_community_size: 2,
}
}
}
#[derive(Debug, Clone)]
pub struct Community {
pub level: usize,
pub index: usize,
pub members: Vec<MemoryId>,
pub parent: Option<usize>,
pub children: Vec<usize>,
}
#[derive(Debug, Clone)]
pub struct CommunityResult {
pub levels: Vec<Vec<Community>>,
pub node_to_community: HashMap<MemoryId, usize>,
pub total_communities: usize,
}
struct AdjacencyGraph {
n: usize,
adj: Vec<Vec<(usize, f64)>>,
total_weight: f64,
degree: Vec<f64>,
index_to_id: Vec<MemoryId>,
}
impl AdjacencyGraph {
async fn from_graph_store(store: &dyn GraphStore) -> HirnResult<Self> {
let node_ids = store.node_ids().await?;
let n = node_ids.len();
let id_to_index: HashMap<MemoryId, usize> = node_ids
.iter()
.enumerate()
.map(|(i, id)| (*id, i))
.collect();
let mut adj: Vec<Vec<(usize, f64)>> = vec![vec![]; n];
for edge in store.all_edges().await? {
let Some(&src) = id_to_index.get(&edge.source) else {
continue;
};
let Some(&tgt) = id_to_index.get(&edge.target) else {
continue;
};
if src == tgt {
continue;
}
let w = edge.weight as f64;
adj[src].push((tgt, w));
adj[tgt].push((src, w));
}
for neighbors in &mut adj {
neighbors.sort_by_key(|&(idx, _)| idx);
neighbors.dedup_by(|a, b| {
if a.0 == b.0 {
b.1 += a.1;
true
} else {
false
}
});
}
let mut total_weight = 0.0;
for neighbors in &adj {
for &(_, w) in neighbors {
total_weight += w;
}
}
total_weight /= 2.0;
let degree: Vec<f64> = adj
.iter()
.map(|ns| ns.iter().map(|&(_, w)| w).sum())
.collect();
drop(id_to_index);
Ok(Self {
n,
adj,
total_weight,
degree,
index_to_id: node_ids,
})
}
fn coarsen(&self, assignments: &[usize], num_communities: usize) -> AdjacencyGraph {
let mut adj: Vec<Vec<(usize, f64)>> = vec![vec![]; num_communities];
for (node, neighbors) in self.adj.iter().enumerate() {
let c1 = assignments[node];
for &(neighbor, w) in neighbors {
let c2 = assignments[neighbor];
if c1 != c2 {
adj[c1].push((c2, w));
}
}
}
for neighbors in &mut adj {
neighbors.sort_by_key(|&(idx, _)| idx);
neighbors.dedup_by(|a, b| {
if a.0 == b.0 {
b.1 += a.1;
true
} else {
false
}
});
}
let mut total_weight = 0.0;
for neighbors in &adj {
for &(_, w) in neighbors {
total_weight += w;
}
}
total_weight /= 2.0;
let degree: Vec<f64> = adj
.iter()
.map(|ns| ns.iter().map(|&(_, w)| w).sum())
.collect();
let index_to_id = (0..num_communities).map(|_| MemoryId::new()).collect();
AdjacencyGraph {
n: num_communities,
adj,
total_weight,
degree,
index_to_id,
}
}
}
pub async fn detect_communities(
store: &dyn GraphStore,
config: &CommunityConfig,
) -> HirnResult<CommunityResult> {
let adj = AdjacencyGraph::from_graph_store(store).await?;
if adj.n == 0 {
return Ok(CommunityResult {
levels: vec![],
node_to_community: HashMap::new(),
total_communities: 0,
});
}
let effective_resolution = if config.auto_resolution && adj.n > 0 {
let avg_degree = 2.0 * adj.total_weight / adj.n as f64;
avg_degree.sqrt().max(0.1_f64).min(10.0_f64)
} else {
config.resolution
};
let effective_config = CommunityConfig {
resolution: effective_resolution,
auto_resolution: false, ..*config
};
let base_index_to_id = adj.index_to_id.clone();
let mut all_levels: Vec<Vec<usize>> = Vec::new();
let mut current_graph = adj;
for _level in 0..effective_config.max_levels {
if current_graph.n <= 1 {
break;
}
let assignments = leiden_one_level(¤t_graph, &effective_config);
let num_communities = *assignments.iter().max().unwrap_or(&0) + 1;
if num_communities >= current_graph.n {
break;
}
all_levels.push(assignments.clone());
current_graph = current_graph.coarsen(&assignments, num_communities);
}
Ok(build_community_result(
&base_index_to_id,
&all_levels,
&effective_config,
))
}
fn leiden_one_level(graph: &AdjacencyGraph, config: &CommunityConfig) -> Vec<usize> {
let n = graph.n;
let mut assignment: Vec<usize> = (0..n).collect();
let mut num_communities = n;
for _iteration in 0..config.max_iterations {
let mut improved = false;
let mut comm_degree: HashMap<usize, f64> = HashMap::new();
for (i, &c) in assignment.iter().enumerate() {
*comm_degree.entry(c).or_default() += graph.degree[i];
}
let m2 = 2.0 * graph.total_weight;
for node in 0..n {
let current_comm = assignment[node];
let mut comm_weights: HashMap<usize, f64> = HashMap::new();
for &(neighbor, w) in &graph.adj[node] {
let nc = assignment[neighbor];
*comm_weights.entry(nc).or_default() += w;
}
let ki = graph.degree[node];
if m2 == 0.0 {
continue;
}
let mut best_comm = current_comm;
let mut best_delta = 0.0;
let w_in_current = comm_weights.get(¤t_comm).copied().unwrap_or(0.0);
let sigma_current = comm_degree.get(¤t_comm).copied().unwrap_or(0.0);
for (&candidate_comm, &w_to_candidate) in &comm_weights {
if candidate_comm == current_comm {
continue;
}
let sigma_candidate = comm_degree.get(&candidate_comm).copied().unwrap_or(0.0);
let delta = (w_to_candidate - w_in_current)
+ config.resolution * ki * (sigma_current - ki - sigma_candidate) / m2;
if delta > best_delta {
best_delta = delta;
best_comm = candidate_comm;
}
}
if best_comm != current_comm {
*comm_degree.entry(current_comm).or_default() -= ki;
*comm_degree.entry(best_comm).or_default() += ki;
assignment[node] = best_comm;
improved = true;
}
}
if !improved {
break;
}
let (compacted, new_count) = compact_assignments(&assignment);
assignment = compacted;
num_communities = new_count;
}
let refined = refine_communities(&assignment, num_communities, graph, config);
refined
}
fn compact_assignments(assignments: &[usize]) -> (Vec<usize>, usize) {
let mut mapping: HashMap<usize, usize> = HashMap::new();
let mut next_id = 0;
let compacted: Vec<usize> = assignments
.iter()
.map(|&c| {
*mapping.entry(c).or_insert_with(|| {
let id = next_id;
next_id += 1;
id
})
})
.collect();
(compacted, next_id)
}
fn refine_communities(
assignments: &[usize],
_num_communities: usize,
graph: &AdjacencyGraph,
_config: &CommunityConfig,
) -> Vec<usize> {
let mut refined = assignments.to_vec();
for _pass in 0..3 {
let mut changed = false;
for node in 0..graph.n {
let my_comm = refined[node];
let mut w_internal = 0.0;
let mut best_external_comm = my_comm;
let mut best_external_weight = 0.0;
let mut ext_weights: HashMap<usize, f64> = HashMap::new();
for &(neighbor, w) in &graph.adj[node] {
if refined[neighbor] == my_comm {
w_internal += w;
} else {
*ext_weights.entry(refined[neighbor]).or_default() += w;
}
}
for (&c, &w) in &ext_weights {
if w > best_external_weight {
best_external_weight = w;
best_external_comm = c;
}
}
if best_external_weight > w_internal && best_external_comm != my_comm {
refined[node] = best_external_comm;
changed = true;
}
}
if !changed {
break;
}
}
let (compacted, _) = compact_assignments(&refined);
compacted
}
fn build_community_result(
base_index_to_id: &[MemoryId],
levels: &[Vec<usize>],
config: &CommunityConfig,
) -> CommunityResult {
if levels.is_empty() {
let mut node_to_community = HashMap::new();
for (i, id) in base_index_to_id.iter().enumerate() {
node_to_community.insert(*id, i);
}
return CommunityResult {
levels: vec![],
node_to_community,
total_communities: 0,
};
}
let base_assignments = &levels[0];
let num_base = *base_assignments.iter().max().unwrap_or(&0) + 1;
let mut leaf_communities: Vec<Community> = (0..num_base)
.map(|idx| Community {
level: 0,
index: idx,
members: vec![],
parent: None,
children: vec![],
})
.collect();
let mut node_to_community = HashMap::new();
for (node_idx, &comm) in base_assignments.iter().enumerate() {
if comm < leaf_communities.len() {
let id = base_index_to_id[node_idx];
leaf_communities[comm].members.push(id);
node_to_community.insert(id, comm);
}
}
let mut valid_leaf: Vec<Community> = leaf_communities
.into_iter()
.filter(|c| c.members.len() >= config.min_community_size)
.collect();
for (i, c) in valid_leaf.iter_mut().enumerate() {
c.index = i;
}
node_to_community.clear();
for c in &valid_leaf {
for &member in &c.members {
node_to_community.insert(member, c.index);
}
}
let mut all_levels: Vec<Vec<Community>> = vec![valid_leaf];
for (level_idx, level_assignments) in levels.iter().skip(1).enumerate() {
let prev_level = &all_levels[level_idx];
let num_comms = *level_assignments.iter().max().unwrap_or(&0) + 1;
let mut higher: Vec<Community> = (0..num_comms)
.map(|idx| Community {
level: level_idx + 1,
index: idx,
members: vec![],
parent: None,
children: vec![],
})
.collect();
for (prev_idx, &parent_comm) in level_assignments.iter().enumerate() {
if prev_idx < prev_level.len() && parent_comm < higher.len() {
higher[parent_comm]
.children
.push(prev_level[prev_idx].index);
higher[parent_comm]
.members
.extend_from_slice(&prev_level[prev_idx].members);
}
}
for (prev_idx, &parent_comm) in level_assignments.iter().enumerate() {
if prev_idx < all_levels[level_idx].len() {
all_levels[level_idx][prev_idx].parent = Some(parent_comm);
}
}
let valid: Vec<Community> = higher
.into_iter()
.filter(|c| !c.members.is_empty())
.collect();
all_levels.push(valid);
}
let total = all_levels.iter().map(|l| l.len()).sum();
CommunityResult {
levels: all_levels,
node_to_community,
total_communities: total,
}
}
#[derive(Debug, Clone)]
pub struct CommunityDelta {
pub added: Vec<usize>,
pub modified: Vec<usize>,
pub unchanged: Vec<usize>,
pub removed: Vec<usize>,
}
pub fn compute_community_delta(prev: &CommunityResult, new: &CommunityResult) -> CommunityDelta {
use std::collections::HashSet;
fn member_key(community: &Community) -> Vec<MemoryId> {
let mut ids = community.members.clone();
ids.sort();
ids
}
let prev_leaves = prev.levels.first().map(|l| l.as_slice()).unwrap_or(&[]);
let new_leaves = new.levels.first().map(|l| l.as_slice()).unwrap_or(&[]);
let prev_keys: HashMap<Vec<MemoryId>, usize> = prev_leaves
.iter()
.map(|c| (member_key(c), c.index))
.collect();
let new_keys: HashMap<Vec<MemoryId>, usize> = new_leaves
.iter()
.map(|c| (member_key(c), c.index))
.collect();
let mut added = Vec::new();
let mut modified = Vec::new();
let mut unchanged = Vec::new();
for community in new_leaves {
let key = member_key(community);
if prev_keys.contains_key(&key) {
unchanged.push(community.index);
} else {
let new_members: HashSet<_> = community.members.iter().collect();
let is_modified = prev_leaves
.iter()
.any(|pc| pc.members.iter().any(|m| new_members.contains(m)));
if is_modified {
modified.push(community.index);
} else {
added.push(community.index);
}
}
}
let removed: Vec<usize> = prev_leaves
.iter()
.filter(|pc| {
let key = member_key(pc);
!new_keys.contains_key(&key)
})
.map(|pc| pc.index)
.collect();
CommunityDelta {
added,
modified,
unchanged,
removed,
}
}
#[derive(Debug, Clone)]
pub struct CommunitySummaryResult {
pub summaries_stored: usize,
pub edges_created: usize,
}
async fn community_edge_exists(
db: &HirnDB,
source: MemoryId,
target: MemoryId,
relation: EdgeRelation,
) -> bool {
match db.cached_graph().get_edges_between(source, target).await {
Ok(edges) => edges.iter().any(|edge| {
edge.relation == relation && edge.source == source && edge.target == target
}),
Err(error) => {
tracing::warn!(
source = %source,
target = %target,
relation = ?relation,
error = %error,
"failed to inspect community summary edge"
);
false
}
}
}
async fn ensure_community_edge(
db: &HirnDB,
source: MemoryId,
target: MemoryId,
relation: EdgeRelation,
) -> bool {
if community_edge_exists(db, source, target, relation).await {
return false;
}
match db
.connect_with(source, target, relation, 1.0, Metadata::default())
.await
{
Ok(_) => true,
Err(hirn_core::HirnError::AlreadyExists(error)) => {
if community_edge_exists(db, source, target, relation).await {
true
} else {
tracing::warn!(
source = %source,
target = %target,
relation = ?relation,
error = %error,
"community edge write reported duplicate without leaving a visible edge"
);
false
}
}
Err(error) => {
tracing::warn!(
source = %source,
target = %target,
relation = ?relation,
error = %error,
"failed to create community summary edge"
);
false
}
}
}
async fn repair_community_membership_edges(
db: &HirnDB,
summary_id: MemoryId,
members: &[MemoryId],
) -> usize {
let mut edges_created = 0;
for &member_id in members {
if ensure_community_edge(db, summary_id, member_id, EdgeRelation::DerivedFrom).await {
edges_created += 1;
}
if ensure_community_edge(db, member_id, summary_id, EdgeRelation::PartOf).await {
edges_created += 1;
}
}
edges_created
}
pub async fn generate_community_summaries(
db: &HirnDB,
llm: &Arc<dyn LlmProvider>,
communities: &CommunityResult,
max_members_per_prompt: usize,
llm_timeout: std::time::Duration,
) -> HirnResult<CommunitySummaryResult> {
if communities.levels.is_empty() {
return Ok(CommunitySummaryResult {
summaries_stored: 0,
edges_created: 0,
});
}
let agent = AgentId::well_known("community");
let leaf_communities = &communities.levels[0];
let mut summaries_stored = 0;
let mut edges_created = 0;
for community in leaf_communities {
if community.members.is_empty() {
continue;
}
let descriptions =
collect_member_descriptions(db, &community.members, max_members_per_prompt).await;
if descriptions.is_empty() {
continue;
}
let concept_name = format!("community-{}-{}", community.level, community.index);
if let Ok(existing) = db.get_semantic_by_concept(&concept_name).await {
edges_created +=
repair_community_membership_edges(db, existing.id, &community.members).await;
continue;
}
let member_text = descriptions
.iter()
.enumerate()
.map(|(i, d)| format!("{}. {}", i + 1, d))
.collect::<Vec<_>>()
.join("\n");
let system = ChatMessage {
role: "system".to_string(),
content: "You are an analyst that produces concise community summaries. \
Given a list of related memory descriptions, produce a structured summary \
with the following format:\n\
THEME: <one-line theme>\n\
KEY_ENTITIES: <comma-separated key entities>\n\
SUMMARY: <2-4 sentence summary including representative examples>"
.to_string(),
};
let sanitized_member_text = hirn_core::sanitize::sanitize_for_llm(&member_text);
let user = ChatMessage {
role: "user".to_string(),
content: format!(
"Summarize the following {} related memories (community level {}, index {}) \
into a structured community summary:\n\n{}",
descriptions.len(),
community.level,
community.index,
sanitized_member_text
),
};
let options = LlmOptions {
temperature: 0.3,
max_tokens: 256,
..Default::default()
};
let summary =
super::generate_text_with_timeout(llm.as_ref(), &[system, user], &options, llm_timeout)
.await?;
let mut builder = SemanticRecord::builder()
.concept(&concept_name)
.knowledge_type(KnowledgeType::Community)
.description(&summary)
.confidence(0.7)
.agent_id(agent.clone())
.origin(Origin::Consolidation);
if let Ok(emb) = db.embed_text(&summary).await {
builder = builder.embedding(emb);
}
for &member_id in &community.members {
builder = builder.source_episode(member_id);
}
let record = builder.build()?;
let semantic_id = db.store_semantic(record).await?;
summaries_stored += 1;
edges_created +=
repair_community_membership_edges(db, semantic_id, &community.members).await;
}
Ok(CommunitySummaryResult {
summaries_stored,
edges_created,
})
}
pub async fn generate_community_summaries_incremental(
db: &HirnDB,
llm: &Arc<dyn LlmProvider>,
prev: &CommunityResult,
new: &CommunityResult,
max_members_per_prompt: usize,
llm_timeout: std::time::Duration,
) -> HirnResult<CommunitySummaryResult> {
let delta = compute_community_delta(prev, new);
for &removed_idx in &delta.removed {
let concept_name = format!("community-0-{removed_idx}");
if let Ok(record) = db.get_semantic_by_concept(&concept_name).await {
db.purge_semantic(record.id).await?;
}
}
for &modified_idx in &delta.modified {
let concept_name = format!("community-0-{modified_idx}");
if let Ok(record) = db.get_semantic_by_concept(&concept_name).await {
db.purge_semantic(record.id).await?;
}
}
let needs_summary: std::collections::HashSet<usize> = delta
.added
.iter()
.chain(delta.modified.iter())
.copied()
.collect();
let filtered_leaves: Vec<Community> = new
.levels
.first()
.map(|l| {
l.iter()
.filter(|c| needs_summary.contains(&c.index))
.cloned()
.collect()
})
.unwrap_or_default();
let unchanged_leaves: Vec<Community> = new
.levels
.first()
.map(|l| {
l.iter()
.filter(|c| !needs_summary.contains(&c.index))
.cloned()
.collect()
})
.unwrap_or_default();
let mut result = if filtered_leaves.is_empty() {
CommunitySummaryResult {
summaries_stored: 0,
edges_created: 0,
}
} else {
let filtered = CommunityResult {
levels: vec![filtered_leaves],
node_to_community: new.node_to_community.clone(),
total_communities: needs_summary.len(),
};
generate_community_summaries(db, llm, &filtered, max_members_per_prompt, llm_timeout)
.await?
};
for community in unchanged_leaves {
let concept_name = format!("community-{}-{}", community.level, community.index);
if let Ok(existing) = db.get_semantic_by_concept(&concept_name).await {
result.edges_created +=
repair_community_membership_edges(db, existing.id, &community.members).await;
}
}
Ok(result)
}
async fn collect_member_descriptions(db: &HirnDB, members: &[MemoryId], max: usize) -> Vec<String> {
let graph = db.graph_store();
let mut member_layers: Vec<(MemoryId, Option<Layer>)> = Vec::new();
for &id in members.iter().take(max) {
let layer = graph.node_layer(id).await.ok().flatten();
member_layers.push((id, layer));
}
let mut descriptions = Vec::new();
for (id, layer) in member_layers {
let desc = match layer {
Some(Layer::Semantic) => db
.get_semantic(id)
.await
.ok()
.map(|r| format!("{}: {}", r.concept, r.description)),
Some(Layer::Episodic) => db.get_episode(id).await.ok().map(|r| r.content.clone()),
_ => None,
};
if let Some(d) = desc {
descriptions.push(d);
}
}
descriptions
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
struct MockCommunityLlm {
response: String,
calls: AtomicUsize,
}
impl MockCommunityLlm {
fn new(response: &str) -> Self {
Self {
response: response.to_string(),
calls: AtomicUsize::new(0),
}
}
}
#[async_trait::async_trait]
impl LlmProvider for MockCommunityLlm {
async fn generate_text(
&self,
_messages: &[ChatMessage],
_options: &LlmOptions,
) -> hirn_core::HirnResult<String> {
self.calls.fetch_add(1, Ordering::Relaxed);
Ok(self.response.clone())
}
fn model_id(&self) -> &str {
"mock-community"
}
}
async fn test_db() -> HirnDB {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test");
let lance_path = dir.path().join("lance");
let mut config = hirn_core::HirnConfig::default();
config.db_path = db_path;
config.embedding_dimensions = hirn_core::EmbeddingDimension::new_const(3);
let storage: Arc<dyn hirn_storage::PhysicalStore> = hirn_storage::HirnDb::open(
hirn_storage::HirnDbConfig::local(lance_path.to_str().unwrap()),
)
.await
.unwrap()
.store_arc();
let db = HirnDB::open_with_config(config, storage).await.unwrap();
std::mem::forget(dir);
db
}
#[tokio::test(flavor = "multi_thread")]
async fn summary_empty_communities() {
let db = test_db().await;
let llm: Arc<dyn LlmProvider> = Arc::new(MockCommunityLlm::new("summary text"));
let empty = CommunityResult {
levels: vec![],
node_to_community: HashMap::new(),
total_communities: 0,
};
let result =
generate_community_summaries(&db, &llm, &empty, 50, std::time::Duration::from_secs(30))
.await
.unwrap();
assert_eq!(result.summaries_stored, 0);
assert_eq!(result.edges_created, 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn summary_generated_and_stored() {
let db = test_db().await;
let llm: Arc<dyn LlmProvider> = Arc::new(MockCommunityLlm::new(
"THEME: Testing patterns\n\
KEY_ENTITIES: test-concept-0, test-concept-1, test-concept-2\n\
SUMMARY: This community is about testing. It covers 3 related concepts.",
));
let agent = AgentId::new("test").unwrap();
let mut member_ids = Vec::new();
for i in 0..3 {
let record = SemanticRecord::builder()
.concept(&format!("test-concept-{i}"))
.description(&format!("Description for concept {i}"))
.agent_id(agent.clone())
.origin(Origin::Consolidation)
.build()
.unwrap();
let id = db.store_semantic(record).await.unwrap();
member_ids.push(id);
}
let mut node_to_community = HashMap::new();
for &id in &member_ids {
node_to_community.insert(id, 0);
}
let communities = CommunityResult {
levels: vec![vec![Community {
level: 0,
index: 0,
members: member_ids.clone(),
parent: None,
children: vec![],
}]],
node_to_community,
total_communities: 1,
};
let result = generate_community_summaries(
&db,
&llm,
&communities,
50,
std::time::Duration::from_secs(30),
)
.await
.unwrap();
assert_eq!(result.summaries_stored, 1);
let stored = db.get_semantic_by_concept("community-0-0").await.unwrap();
assert_eq!(stored.knowledge_type, KnowledgeType::Community);
assert!(stored.description.contains("THEME:"));
assert!(stored.description.contains("KEY_ENTITIES:"));
assert_eq!(stored.source_episodes.len(), 3);
}
#[tokio::test(flavor = "multi_thread")]
async fn summary_idempotent() {
let db = test_db().await;
let mock = Arc::new(MockCommunityLlm::new("Summary."));
let llm: Arc<dyn LlmProvider> = mock.clone();
let agent = AgentId::new("test").unwrap();
let record = SemanticRecord::builder()
.concept("member-x")
.description("x desc")
.agent_id(agent.clone())
.origin(Origin::Consolidation)
.build()
.unwrap();
let id = db.store_semantic(record).await.unwrap();
let mut ntc = HashMap::new();
ntc.insert(id, 0);
let communities = CommunityResult {
levels: vec![vec![Community {
level: 0,
index: 0,
members: vec![id],
parent: None,
children: vec![],
}]],
node_to_community: ntc,
total_communities: 1,
};
let r1 = generate_community_summaries(
&db,
&llm,
&communities,
50,
std::time::Duration::from_secs(30),
)
.await
.unwrap();
let r2 = generate_community_summaries(
&db,
&llm,
&communities,
50,
std::time::Duration::from_secs(30),
)
.await
.unwrap();
assert_eq!(r1.summaries_stored, 1);
assert_eq!(r2.summaries_stored, 0);
assert_eq!(mock.calls.load(Ordering::Relaxed), 1);
}
#[tokio::test(flavor = "multi_thread")]
async fn summary_rerun_repairs_missing_membership_edges() {
let db = test_db().await;
let mock = Arc::new(MockCommunityLlm::new("Summary."));
let llm: Arc<dyn LlmProvider> = mock.clone();
let agent = AgentId::new("test").unwrap();
let mut member_ids = Vec::new();
for i in 0..3 {
let record = SemanticRecord::builder()
.concept(&format!("member-{i}"))
.description("member")
.agent_id(agent.clone())
.origin(Origin::Consolidation)
.build()
.unwrap();
member_ids.push(db.store_semantic(record).await.unwrap());
}
let mut node_to_community = HashMap::new();
for &id in &member_ids {
node_to_community.insert(id, 0);
}
let communities = CommunityResult {
levels: vec![vec![Community {
level: 0,
index: 0,
members: member_ids.clone(),
parent: None,
children: vec![],
}]],
node_to_community,
total_communities: 1,
};
let first = generate_community_summaries(
&db,
&llm,
&communities,
50,
std::time::Duration::from_secs(30),
)
.await
.unwrap();
assert_eq!(first.summaries_stored, 1);
let summary = db.get_semantic_by_concept("community-0-0").await.unwrap();
for &member_id in &member_ids {
let edges = db
.cached_graph()
.get_edges_between(summary.id, member_id)
.await
.unwrap();
for edge in edges {
if (edge.relation == EdgeRelation::DerivedFrom
&& edge.source == summary.id
&& edge.target == member_id)
|| (edge.relation == EdgeRelation::PartOf
&& edge.source == member_id
&& edge.target == summary.id)
{
db.cached_graph().remove_edge(edge.id).await.unwrap();
}
}
}
let second = generate_community_summaries(
&db,
&llm,
&communities,
50,
std::time::Duration::from_secs(30),
)
.await
.unwrap();
assert_eq!(second.summaries_stored, 0);
assert_eq!(second.edges_created, member_ids.len() * 2);
assert_eq!(mock.calls.load(Ordering::Relaxed), 1);
for &member_id in &member_ids {
assert!(
community_edge_exists(&db, summary.id, member_id, EdgeRelation::DerivedFrom).await,
"summary should regain DerivedFrom edge to member"
);
assert!(
community_edge_exists(&db, member_id, summary.id, EdgeRelation::PartOf).await,
"member should regain PartOf edge to summary"
);
}
}
#[tokio::test(flavor = "multi_thread")]
async fn incremental_only_affected_communities_regenerated() {
let db = test_db().await;
let mock = Arc::new(MockCommunityLlm::new(
"THEME: Test\nKEY_ENTITIES: a\nSUMMARY: Test.",
));
let llm: Arc<dyn LlmProvider> = mock.clone();
let agent = AgentId::new("test").unwrap();
let mut cluster_a = Vec::new();
let mut cluster_b = Vec::new();
for i in 0..3 {
let record = SemanticRecord::builder()
.concept(&format!("auth-concept-{i}"))
.description(&format!("Auth pattern {i}"))
.agent_id(agent.clone())
.origin(Origin::Consolidation)
.build()
.unwrap();
cluster_a.push(db.store_semantic(record).await.unwrap());
}
for i in 0..3 {
let record = SemanticRecord::builder()
.concept(&format!("cache-concept-{i}"))
.description(&format!("Cache pattern {i}"))
.agent_id(agent.clone())
.origin(Origin::Consolidation)
.build()
.unwrap();
cluster_b.push(db.store_semantic(record).await.unwrap());
}
let mut ntc = HashMap::new();
for &id in &cluster_a {
ntc.insert(id, 0);
}
for &id in &cluster_b {
ntc.insert(id, 1);
}
let prev = CommunityResult {
levels: vec![vec![
Community {
level: 0,
index: 0,
members: cluster_a.clone(),
parent: None,
children: vec![],
},
Community {
level: 0,
index: 1,
members: cluster_b.clone(),
parent: None,
children: vec![],
},
]],
node_to_community: ntc,
total_communities: 2,
};
let r1 =
generate_community_summaries(&db, &llm, &prev, 50, std::time::Duration::from_secs(30))
.await
.unwrap();
assert_eq!(r1.summaries_stored, 2);
assert_eq!(mock.calls.load(Ordering::Relaxed), 2);
let mut cluster_c = Vec::new();
for i in 0..5 {
let record = SemanticRecord::builder()
.concept(&format!("new-topic-{i}"))
.description(&format!("New topic episode {i}"))
.agent_id(agent.clone())
.origin(Origin::Consolidation)
.build()
.unwrap();
cluster_c.push(db.store_semantic(record).await.unwrap());
}
let mut ntc_new = HashMap::new();
for &id in &cluster_a {
ntc_new.insert(id, 0);
}
for &id in &cluster_b {
ntc_new.insert(id, 1);
}
for &id in &cluster_c {
ntc_new.insert(id, 2);
}
let new = CommunityResult {
levels: vec![vec![
Community {
level: 0,
index: 0,
members: cluster_a.clone(),
parent: None,
children: vec![],
},
Community {
level: 0,
index: 1,
members: cluster_b.clone(),
parent: None,
children: vec![],
},
Community {
level: 0,
index: 2,
members: cluster_c.clone(),
parent: None,
children: vec![],
},
]],
node_to_community: ntc_new,
total_communities: 3,
};
mock.calls.store(0, Ordering::Relaxed);
let r2 = generate_community_summaries_incremental(
&db,
&llm,
&prev,
&new,
50,
std::time::Duration::from_secs(30),
)
.await
.unwrap();
assert_eq!(
r2.summaries_stored, 1,
"only the new community should be summarized"
);
assert_eq!(
mock.calls.load(Ordering::Relaxed),
1,
"LLM called only for new community"
);
assert!(db.get_semantic_by_concept("community-0-2").await.is_ok());
assert!(db.get_semantic_by_concept("community-0-0").await.is_ok());
assert!(db.get_semantic_by_concept("community-0-1").await.is_ok());
}
#[tokio::test(flavor = "multi_thread")]
async fn incremental_rerun_repairs_edges_for_unchanged_communities() {
let db = test_db().await;
let mock = Arc::new(MockCommunityLlm::new("Summary."));
let llm: Arc<dyn LlmProvider> = mock.clone();
let agent = AgentId::new("test").unwrap();
let mut member_ids = Vec::new();
for i in 0..2 {
let record = SemanticRecord::builder()
.concept(&format!("inc-member-{i}"))
.description("member")
.agent_id(agent.clone())
.origin(Origin::Consolidation)
.build()
.unwrap();
member_ids.push(db.store_semantic(record).await.unwrap());
}
let mut node_to_community = HashMap::new();
for &id in &member_ids {
node_to_community.insert(id, 0);
}
let communities = CommunityResult {
levels: vec![vec![Community {
level: 0,
index: 0,
members: member_ids.clone(),
parent: None,
children: vec![],
}]],
node_to_community,
total_communities: 1,
};
generate_community_summaries(
&db,
&llm,
&communities,
50,
std::time::Duration::from_secs(30),
)
.await
.unwrap();
let summary = db.get_semantic_by_concept("community-0-0").await.unwrap();
for &member_id in &member_ids {
let edges = db
.cached_graph()
.get_edges_between(summary.id, member_id)
.await
.unwrap();
for edge in edges {
if edge.relation == EdgeRelation::DerivedFrom
&& edge.source == summary.id
&& edge.target == member_id
{
db.cached_graph().remove_edge(edge.id).await.unwrap();
}
}
}
let rerun = generate_community_summaries_incremental(
&db,
&llm,
&communities,
&communities,
50,
std::time::Duration::from_secs(30),
)
.await
.unwrap();
assert_eq!(rerun.summaries_stored, 0);
assert_eq!(rerun.edges_created, member_ids.len());
assert_eq!(mock.calls.load(Ordering::Relaxed), 1);
for &member_id in &member_ids {
assert!(
community_edge_exists(&db, summary.id, member_id, EdgeRelation::DerivedFrom).await,
"incremental rerun should repair unchanged community summary edge"
);
}
}
#[test]
fn compute_delta_identifies_changes() {
let ids_a: Vec<MemoryId> = (0..3).map(|_| MemoryId::new()).collect();
let ids_b: Vec<MemoryId> = (0..3).map(|_| MemoryId::new()).collect();
let ids_c: Vec<MemoryId> = (0..3).map(|_| MemoryId::new()).collect();
let prev = CommunityResult {
levels: vec![vec![
Community {
level: 0,
index: 0,
members: ids_a.clone(),
parent: None,
children: vec![],
},
Community {
level: 0,
index: 1,
members: ids_b.clone(),
parent: None,
children: vec![],
},
]],
node_to_community: HashMap::new(),
total_communities: 2,
};
let new = CommunityResult {
levels: vec![vec![
Community {
level: 0,
index: 0,
members: ids_a.clone(),
parent: None,
children: vec![],
},
Community {
level: 0,
index: 2,
members: ids_c.clone(),
parent: None,
children: vec![],
},
]],
node_to_community: HashMap::new(),
total_communities: 2,
};
let delta = compute_community_delta(&prev, &new);
assert!(
delta.unchanged.contains(&0),
"community 0 should be unchanged"
);
assert!(delta.added.contains(&2), "community 2 should be added");
assert!(delta.removed.contains(&1), "community 1 should be removed");
assert!(
delta.modified.is_empty(),
"no communities should be modified"
);
}
}