Skip to main content

pulsehive_runtime/intelligence/
insight.rs

1//! Automatic insight synthesis from experience clusters.
2//!
3//! When a cluster of related experiences exceeds the density threshold,
4//! the [`InsightSynthesizer`] uses an LLM to generate a consolidated
5//! [`DerivedInsight`](pulsedb::DerivedInsight) that captures the key pattern.
6
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::sync::Mutex;
9use std::time::Instant;
10
11use pulsedb::{
12    CollectiveId, Experience, ExperienceId, InsightType, NewDerivedInsight, SubstrateProvider,
13};
14use pulsehive_core::llm::{LlmConfig, LlmProvider, Message};
15use tracing::Instrument;
16
17/// Configuration for automatic insight synthesis.
18#[derive(Debug, Clone)]
19pub struct InsightSynthesizerConfig {
20    /// Minimum cluster size to trigger synthesis.
21    /// Default: 5
22    pub relation_density_threshold: usize,
23
24    /// Minimum seconds between synthesis attempts for the same collective.
25    /// Prevents redundant LLM calls when many experiences arrive rapidly.
26    /// Default: 60
27    pub debounce_seconds: u64,
28}
29
30impl Default for InsightSynthesizerConfig {
31    fn default() -> Self {
32        Self {
33            relation_density_threshold: 5,
34            debounce_seconds: 60,
35        }
36    }
37}
38
39/// Synthesizes insights from clusters of related experiences using an LLM.
40///
41/// Created via [`InsightSynthesizer::new()`] with an [`InsightSynthesizerConfig`].
42pub struct InsightSynthesizer {
43    config: InsightSynthesizerConfig,
44    /// Tracks last synthesis time per collective for debouncing.
45    last_synthesis: Mutex<HashMap<CollectiveId, Instant>>,
46}
47
48impl InsightSynthesizer {
49    /// Create a new synthesizer with the given configuration.
50    pub fn new(config: InsightSynthesizerConfig) -> Self {
51        Self {
52            config,
53            last_synthesis: Mutex::new(HashMap::new()),
54        }
55    }
56
57    /// Create a new synthesizer with default configuration.
58    pub fn with_defaults() -> Self {
59        Self::new(InsightSynthesizerConfig::default())
60    }
61
62    /// Access the configuration.
63    pub fn config(&self) -> &InsightSynthesizerConfig {
64        &self.config
65    }
66
67    /// Check if synthesis should be attempted based on cluster size.
68    pub fn should_synthesize(&self, cluster_size: usize) -> bool {
69        cluster_size >= self.config.relation_density_threshold
70    }
71
72    /// Check if a collective is still within the debounce window.
73    pub fn is_debounced(&self, collective_id: CollectiveId) -> bool {
74        let guard = self.last_synthesis.lock().unwrap();
75        if let Some(last) = guard.get(&collective_id) {
76            last.elapsed().as_secs() < self.config.debounce_seconds
77        } else {
78            false
79        }
80    }
81
82    /// Record that synthesis was performed for a collective (updates debounce timer).
83    pub fn mark_synthesized(&self, collective_id: CollectiveId) {
84        let mut guard = self.last_synthesis.lock().unwrap();
85        guard.insert(collective_id, Instant::now());
86    }
87
88    /// Find all experiences connected to `start_id` via relations (BFS traversal).
89    ///
90    /// Traverses the relation graph starting from the given experience,
91    /// collecting all reachable experiences. Capped at 50 to prevent
92    /// runaway traversal on dense graphs.
93    pub async fn find_cluster(
94        &self,
95        start_id: ExperienceId,
96        substrate: &dyn SubstrateProvider,
97    ) -> Vec<Experience> {
98        const MAX_CLUSTER_SIZE: usize = 50;
99
100        let mut visited: HashSet<ExperienceId> = HashSet::new();
101        let mut queue: VecDeque<ExperienceId> = VecDeque::new();
102        let mut cluster: Vec<Experience> = Vec::new();
103
104        queue.push_back(start_id);
105        visited.insert(start_id);
106
107        while let Some(exp_id) = queue.pop_front() {
108            if cluster.len() >= MAX_CLUSTER_SIZE {
109                break;
110            }
111
112            // Get all related experiences
113            let related = match substrate.get_related(exp_id).await {
114                Ok(r) => r,
115                Err(e) => {
116                    tracing::warn!(error = %e, "InsightSynthesizer: get_related failed");
117                    continue;
118                }
119            };
120
121            for (experience, _relation) in related {
122                if !visited.contains(&experience.id) {
123                    visited.insert(experience.id);
124                    queue.push_back(experience.id);
125                    cluster.push(experience);
126                }
127            }
128        }
129
130        tracing::debug!(cluster_size = cluster.len(), experience_id = %start_id, "Cluster found");
131        cluster
132    }
133
134    /// Synthesize a cluster of related experiences into a consolidated insight using an LLM.
135    ///
136    /// Builds a prompt from experience contents, calls the LLM, and returns a
137    /// `NewDerivedInsight` ready to store. Returns `None` if synthesis fails.
138    pub async fn synthesize_cluster(
139        &self,
140        cluster: &[Experience],
141        collective_id: CollectiveId,
142        provider: &dyn LlmProvider,
143        llm_config: &LlmConfig,
144    ) -> Option<NewDerivedInsight> {
145        if cluster.is_empty() {
146            return None;
147        }
148
149        // Build synthesis prompt from cluster
150        let mut experience_list = String::new();
151        for (i, exp) in cluster.iter().enumerate() {
152            experience_list.push_str(&format!(
153                "{}. [{}] {}\n",
154                i + 1,
155                format!("{:?}", exp.experience_type)
156                    .split('{')
157                    .next()
158                    .unwrap_or("Unknown")
159                    .trim(),
160                exp.content
161            ));
162        }
163
164        let prompt = format!(
165            "You are analyzing a cluster of {} related experiences from an AI agent system. \
166             Synthesize them into a single concise insight (2-3 sentences) that captures \
167             the key pattern or learning.\n\nExperiences:\n{}",
168            cluster.len(),
169            experience_list
170        );
171
172        let messages = vec![
173            Message::system(
174                "You are a knowledge synthesis expert. Provide concise, actionable insights.",
175            ),
176            Message::user(&prompt),
177        ];
178
179        // Call LLM for synthesis
180        let response = match provider
181            .chat(messages, vec![], llm_config)
182            .instrument(tracing::debug_span!(
183                "synthesize_insight",
184                cluster_size = cluster.len()
185            ))
186            .await
187        {
188            Ok(r) => r,
189            Err(e) => {
190                tracing::warn!(error = %e, "InsightSynthesizer: LLM call failed");
191                return None;
192            }
193        };
194
195        let content = response.content.unwrap_or_default();
196        if content.is_empty() {
197            return None;
198        }
199
200        // Compute average confidence from sources
201        let avg_confidence = if cluster.is_empty() {
202            0.5
203        } else {
204            cluster.iter().map(|e| e.confidence).sum::<f32>() / cluster.len() as f32
205        };
206
207        // Collect unique domains from all sources
208        let domains: Vec<String> = cluster
209            .iter()
210            .flat_map(|e| e.domain.iter().cloned())
211            .collect::<HashSet<_>>()
212            .into_iter()
213            .collect();
214
215        Some(NewDerivedInsight {
216            collective_id,
217            content,
218            embedding: None, // PulseDB builtin embeddings compute this
219            source_experience_ids: cluster.iter().map(|e| e.id).collect(),
220            insight_type: InsightType::Synthesis,
221            confidence: avg_confidence,
222            domain: domains,
223        })
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    #[test]
232    fn test_should_synthesize_below_threshold() {
233        let synth = InsightSynthesizer::with_defaults(); // threshold = 5
234        assert!(!synth.should_synthesize(3));
235        assert!(!synth.should_synthesize(4));
236    }
237
238    #[test]
239    fn test_should_synthesize_at_threshold() {
240        let synth = InsightSynthesizer::with_defaults();
241        assert!(synth.should_synthesize(5));
242        assert!(synth.should_synthesize(10));
243    }
244
245    #[test]
246    fn test_debounce_blocks_immediate_retry() {
247        let synth = InsightSynthesizer::with_defaults(); // debounce = 60s
248        let cid = CollectiveId::new();
249
250        assert!(
251            !synth.is_debounced(cid),
252            "Should not be debounced initially"
253        );
254
255        synth.mark_synthesized(cid);
256        assert!(synth.is_debounced(cid), "Should be debounced after marking");
257    }
258
259    #[test]
260    fn test_debounce_allows_different_collective() {
261        let synth = InsightSynthesizer::with_defaults();
262        let cid_a = CollectiveId::new();
263        let cid_b = CollectiveId::new();
264
265        synth.mark_synthesized(cid_a);
266        assert!(synth.is_debounced(cid_a));
267        assert!(
268            !synth.is_debounced(cid_b),
269            "Different collective should not be debounced"
270        );
271    }
272
273    #[test]
274    fn test_config_defaults() {
275        let config = InsightSynthesizerConfig::default();
276        assert_eq!(config.relation_density_threshold, 5);
277        assert_eq!(config.debounce_seconds, 60);
278    }
279
280    #[test]
281    fn test_zero_debounce_never_blocks() {
282        let synth = InsightSynthesizer::new(InsightSynthesizerConfig {
283            relation_density_threshold: 5,
284            debounce_seconds: 0,
285        });
286        let cid = CollectiveId::new();
287
288        synth.mark_synthesized(cid);
289        // With 0s debounce, should not be debounced
290        assert!(!synth.is_debounced(cid));
291    }
292}