pulsehive_runtime/intelligence/
insight.rs1use std::collections::{HashMap, HashSet, VecDeque};
8use std::sync::Mutex;
9use std::time::Instant;
10
11use pulsedb::{
12 CollectiveId, Experience, ExperienceId, InsightType, NewDerivedInsight, SubstrateProvider,
13};
14use pulsehive_core::llm::{LlmConfig, LlmProvider, Message};
15use tracing::Instrument;
16
17#[derive(Debug, Clone)]
19pub struct InsightSynthesizerConfig {
20 pub relation_density_threshold: usize,
23
24 pub debounce_seconds: u64,
28}
29
30impl Default for InsightSynthesizerConfig {
31 fn default() -> Self {
32 Self {
33 relation_density_threshold: 5,
34 debounce_seconds: 60,
35 }
36 }
37}
38
39pub struct InsightSynthesizer {
43 config: InsightSynthesizerConfig,
44 last_synthesis: Mutex<HashMap<CollectiveId, Instant>>,
46}
47
48impl InsightSynthesizer {
49 pub fn new(config: InsightSynthesizerConfig) -> Self {
51 Self {
52 config,
53 last_synthesis: Mutex::new(HashMap::new()),
54 }
55 }
56
57 pub fn with_defaults() -> Self {
59 Self::new(InsightSynthesizerConfig::default())
60 }
61
62 pub fn config(&self) -> &InsightSynthesizerConfig {
64 &self.config
65 }
66
67 pub fn should_synthesize(&self, cluster_size: usize) -> bool {
69 cluster_size >= self.config.relation_density_threshold
70 }
71
72 pub fn is_debounced(&self, collective_id: CollectiveId) -> bool {
74 let guard = self.last_synthesis.lock().unwrap();
75 if let Some(last) = guard.get(&collective_id) {
76 last.elapsed().as_secs() < self.config.debounce_seconds
77 } else {
78 false
79 }
80 }
81
82 pub fn mark_synthesized(&self, collective_id: CollectiveId) {
84 let mut guard = self.last_synthesis.lock().unwrap();
85 guard.insert(collective_id, Instant::now());
86 }
87
88 pub async fn find_cluster(
94 &self,
95 start_id: ExperienceId,
96 substrate: &dyn SubstrateProvider,
97 ) -> Vec<Experience> {
98 const MAX_CLUSTER_SIZE: usize = 50;
99
100 let mut visited: HashSet<ExperienceId> = HashSet::new();
101 let mut queue: VecDeque<ExperienceId> = VecDeque::new();
102 let mut cluster: Vec<Experience> = Vec::new();
103
104 queue.push_back(start_id);
105 visited.insert(start_id);
106
107 while let Some(exp_id) = queue.pop_front() {
108 if cluster.len() >= MAX_CLUSTER_SIZE {
109 break;
110 }
111
112 let related = match substrate.get_related(exp_id).await {
114 Ok(r) => r,
115 Err(e) => {
116 tracing::warn!(error = %e, "InsightSynthesizer: get_related failed");
117 continue;
118 }
119 };
120
121 for (experience, _relation) in related {
122 if !visited.contains(&experience.id) {
123 visited.insert(experience.id);
124 queue.push_back(experience.id);
125 cluster.push(experience);
126 }
127 }
128 }
129
130 tracing::debug!(cluster_size = cluster.len(), experience_id = %start_id, "Cluster found");
131 cluster
132 }
133
134 pub async fn synthesize_cluster(
139 &self,
140 cluster: &[Experience],
141 collective_id: CollectiveId,
142 provider: &dyn LlmProvider,
143 llm_config: &LlmConfig,
144 ) -> Option<NewDerivedInsight> {
145 if cluster.is_empty() {
146 return None;
147 }
148
149 let mut experience_list = String::new();
151 for (i, exp) in cluster.iter().enumerate() {
152 experience_list.push_str(&format!(
153 "{}. [{}] {}\n",
154 i + 1,
155 format!("{:?}", exp.experience_type)
156 .split('{')
157 .next()
158 .unwrap_or("Unknown")
159 .trim(),
160 exp.content
161 ));
162 }
163
164 let prompt = format!(
165 "You are analyzing a cluster of {} related experiences from an AI agent system. \
166 Synthesize them into a single concise insight (2-3 sentences) that captures \
167 the key pattern or learning.\n\nExperiences:\n{}",
168 cluster.len(),
169 experience_list
170 );
171
172 let messages = vec![
173 Message::system(
174 "You are a knowledge synthesis expert. Provide concise, actionable insights.",
175 ),
176 Message::user(&prompt),
177 ];
178
179 let response = match provider
181 .chat(messages, vec![], llm_config)
182 .instrument(tracing::debug_span!(
183 "synthesize_insight",
184 cluster_size = cluster.len()
185 ))
186 .await
187 {
188 Ok(r) => r,
189 Err(e) => {
190 tracing::warn!(error = %e, "InsightSynthesizer: LLM call failed");
191 return None;
192 }
193 };
194
195 let content = response.content.unwrap_or_default();
196 if content.is_empty() {
197 return None;
198 }
199
200 let avg_confidence = if cluster.is_empty() {
202 0.5
203 } else {
204 cluster.iter().map(|e| e.confidence).sum::<f32>() / cluster.len() as f32
205 };
206
207 let domains: Vec<String> = cluster
209 .iter()
210 .flat_map(|e| e.domain.iter().cloned())
211 .collect::<HashSet<_>>()
212 .into_iter()
213 .collect();
214
215 Some(NewDerivedInsight {
216 collective_id,
217 content,
218 embedding: None, source_experience_ids: cluster.iter().map(|e| e.id).collect(),
220 insight_type: InsightType::Synthesis,
221 confidence: avg_confidence,
222 domain: domains,
223 })
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230
231 #[test]
232 fn test_should_synthesize_below_threshold() {
233 let synth = InsightSynthesizer::with_defaults(); assert!(!synth.should_synthesize(3));
235 assert!(!synth.should_synthesize(4));
236 }
237
238 #[test]
239 fn test_should_synthesize_at_threshold() {
240 let synth = InsightSynthesizer::with_defaults();
241 assert!(synth.should_synthesize(5));
242 assert!(synth.should_synthesize(10));
243 }
244
245 #[test]
246 fn test_debounce_blocks_immediate_retry() {
247 let synth = InsightSynthesizer::with_defaults(); let cid = CollectiveId::new();
249
250 assert!(
251 !synth.is_debounced(cid),
252 "Should not be debounced initially"
253 );
254
255 synth.mark_synthesized(cid);
256 assert!(synth.is_debounced(cid), "Should be debounced after marking");
257 }
258
259 #[test]
260 fn test_debounce_allows_different_collective() {
261 let synth = InsightSynthesizer::with_defaults();
262 let cid_a = CollectiveId::new();
263 let cid_b = CollectiveId::new();
264
265 synth.mark_synthesized(cid_a);
266 assert!(synth.is_debounced(cid_a));
267 assert!(
268 !synth.is_debounced(cid_b),
269 "Different collective should not be debounced"
270 );
271 }
272
273 #[test]
274 fn test_config_defaults() {
275 let config = InsightSynthesizerConfig::default();
276 assert_eq!(config.relation_density_threshold, 5);
277 assert_eq!(config.debounce_seconds, 60);
278 }
279
280 #[test]
281 fn test_zero_debounce_never_blocks() {
282 let synth = InsightSynthesizer::new(InsightSynthesizerConfig {
283 relation_density_threshold: 5,
284 debounce_seconds: 0,
285 });
286 let cid = CollectiveId::new();
287
288 synth.mark_synthesized(cid);
289 assert!(!synth.is_debounced(cid));
291 }
292}