use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Mutex;
use std::time::Instant;
use pulsedb::{
CollectiveId, Experience, ExperienceId, InsightType, NewDerivedInsight, SubstrateProvider,
};
use pulsehive_core::llm::{LlmConfig, LlmProvider, Message};
use tracing::Instrument;
#[derive(Debug, Clone)]
pub struct InsightSynthesizerConfig {
pub relation_density_threshold: usize,
pub debounce_seconds: u64,
}
impl Default for InsightSynthesizerConfig {
fn default() -> Self {
Self {
relation_density_threshold: 5,
debounce_seconds: 60,
}
}
}
pub struct InsightSynthesizer {
config: InsightSynthesizerConfig,
last_synthesis: Mutex<HashMap<CollectiveId, Instant>>,
}
impl InsightSynthesizer {
pub fn new(config: InsightSynthesizerConfig) -> Self {
Self {
config,
last_synthesis: Mutex::new(HashMap::new()),
}
}
pub fn with_defaults() -> Self {
Self::new(InsightSynthesizerConfig::default())
}
pub fn config(&self) -> &InsightSynthesizerConfig {
&self.config
}
pub fn should_synthesize(&self, cluster_size: usize) -> bool {
cluster_size >= self.config.relation_density_threshold
}
pub fn is_debounced(&self, collective_id: CollectiveId) -> bool {
let guard = self.last_synthesis.lock().unwrap();
if let Some(last) = guard.get(&collective_id) {
last.elapsed().as_secs() < self.config.debounce_seconds
} else {
false
}
}
pub fn mark_synthesized(&self, collective_id: CollectiveId) {
let mut guard = self.last_synthesis.lock().unwrap();
guard.insert(collective_id, Instant::now());
}
pub async fn find_cluster(
&self,
start_id: ExperienceId,
substrate: &dyn SubstrateProvider,
) -> Vec<Experience> {
const MAX_CLUSTER_SIZE: usize = 50;
let mut visited: HashSet<ExperienceId> = HashSet::new();
let mut queue: VecDeque<ExperienceId> = VecDeque::new();
let mut cluster: Vec<Experience> = Vec::new();
queue.push_back(start_id);
visited.insert(start_id);
while let Some(exp_id) = queue.pop_front() {
if cluster.len() >= MAX_CLUSTER_SIZE {
break;
}
let related = match substrate.get_related(exp_id).await {
Ok(r) => r,
Err(e) => {
tracing::warn!(error = %e, "InsightSynthesizer: get_related failed");
continue;
}
};
for (experience, _relation) in related {
if !visited.contains(&experience.id) {
visited.insert(experience.id);
queue.push_back(experience.id);
cluster.push(experience);
}
}
}
tracing::debug!(cluster_size = cluster.len(), experience_id = %start_id, "Cluster found");
cluster
}
pub async fn synthesize_cluster(
&self,
cluster: &[Experience],
collective_id: CollectiveId,
provider: &dyn LlmProvider,
llm_config: &LlmConfig,
) -> Option<NewDerivedInsight> {
if cluster.is_empty() {
return None;
}
let mut experience_list = String::new();
for (i, exp) in cluster.iter().enumerate() {
experience_list.push_str(&format!(
"{}. [{}] {}\n",
i + 1,
format!("{:?}", exp.experience_type)
.split('{')
.next()
.unwrap_or("Unknown")
.trim(),
exp.content
));
}
let prompt = format!(
"You are analyzing a cluster of {} related experiences from an AI agent system. \
Synthesize them into a single concise insight (2-3 sentences) that captures \
the key pattern or learning.\n\nExperiences:\n{}",
cluster.len(),
experience_list
);
let messages = vec![
Message::system(
"You are a knowledge synthesis expert. Provide concise, actionable insights.",
),
Message::user(&prompt),
];
let response = match provider
.chat(messages, vec![], llm_config)
.instrument(tracing::debug_span!(
"synthesize_insight",
cluster_size = cluster.len()
))
.await
{
Ok(r) => r,
Err(e) => {
tracing::warn!(error = %e, "InsightSynthesizer: LLM call failed");
return None;
}
};
let content = response.content.unwrap_or_default();
if content.is_empty() {
return None;
}
let avg_confidence = if cluster.is_empty() {
0.5
} else {
cluster.iter().map(|e| e.confidence).sum::<f32>() / cluster.len() as f32
};
let domains: Vec<String> = cluster
.iter()
.flat_map(|e| e.domain.iter().cloned())
.collect::<HashSet<_>>()
.into_iter()
.collect();
Some(NewDerivedInsight {
collective_id,
content,
embedding: None, source_experience_ids: cluster.iter().map(|e| e.id).collect(),
insight_type: InsightType::Synthesis,
confidence: avg_confidence,
domain: domains,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_should_synthesize_below_threshold() {
let synth = InsightSynthesizer::with_defaults(); assert!(!synth.should_synthesize(3));
assert!(!synth.should_synthesize(4));
}
#[test]
fn test_should_synthesize_at_threshold() {
let synth = InsightSynthesizer::with_defaults();
assert!(synth.should_synthesize(5));
assert!(synth.should_synthesize(10));
}
#[test]
fn test_debounce_blocks_immediate_retry() {
let synth = InsightSynthesizer::with_defaults(); let cid = CollectiveId::new();
assert!(
!synth.is_debounced(cid),
"Should not be debounced initially"
);
synth.mark_synthesized(cid);
assert!(synth.is_debounced(cid), "Should be debounced after marking");
}
#[test]
fn test_debounce_allows_different_collective() {
let synth = InsightSynthesizer::with_defaults();
let cid_a = CollectiveId::new();
let cid_b = CollectiveId::new();
synth.mark_synthesized(cid_a);
assert!(synth.is_debounced(cid_a));
assert!(
!synth.is_debounced(cid_b),
"Different collective should not be debounced"
);
}
#[test]
fn test_config_defaults() {
let config = InsightSynthesizerConfig::default();
assert_eq!(config.relation_density_threshold, 5);
assert_eq!(config.debounce_seconds, 60);
}
#[test]
fn test_zero_debounce_never_blocks() {
let synth = InsightSynthesizer::new(InsightSynthesizerConfig {
relation_density_threshold: 5,
debounce_seconds: 0,
});
let cid = CollectiveId::new();
synth.mark_synthesized(cid);
assert!(!synth.is_debounced(cid));
}
}