Skip to main content

codemem_engine/consolidation/
summarize.rs

1use super::ConsolidationResult;
2use crate::CodememEngine;
3use codemem_core::{CodememError, Edge, GraphBackend, MemoryNode, MemoryType, RelationshipType};
4use serde_json::json;
5use std::collections::HashMap;
6
7impl CodememEngine {
8    /// Consolidate summarize: LLM-powered consolidation that finds
9    /// connected components, summarizes large clusters into Insight memories
10    /// linked via SUMMARIZES edges.
11    pub fn consolidate_summarize(
12        &self,
13        min_cluster_size: Option<usize>,
14    ) -> Result<ConsolidationResult, CodememError> {
15        let min_cluster_size = min_cluster_size.unwrap_or(5);
16
17        let provider = crate::compress::CompressProvider::from_env();
18        if !provider.is_enabled() {
19            return Err(CodememError::Config(
20                "CODEMEM_COMPRESS_PROVIDER env var not set. \
21                 Set it to 'ollama', 'openai', or 'anthropic' to enable LLM-powered consolidation."
22                    .to_string(),
23            ));
24        }
25
26        // Find connected components via the graph
27        let graph = self.lock_graph()?;
28        let components = graph.connected_components();
29        drop(graph);
30
31        let large_clusters: Vec<&Vec<String>> = components
32            .iter()
33            .filter(|c| c.len() >= min_cluster_size)
34            .collect();
35
36        if large_clusters.is_empty() {
37            return Ok(ConsolidationResult {
38                cycle: "summarize".to_string(),
39                affected: 0,
40                details: json!({
41                    "clusters_found": 0,
42                    "min_cluster_size": min_cluster_size,
43                    "message": format!("No clusters with {} or more members found", min_cluster_size),
44                }),
45            });
46        }
47
48        let mut summarized_count = 0usize;
49        let mut created_ids: Vec<String> = Vec::new();
50
51        for cluster in &large_clusters {
52            let mut contents: Vec<String> = Vec::new();
53            let mut source_ids: Vec<String> = Vec::new();
54            let mut all_tags: Vec<String> = Vec::new();
55
56            // M12: Acquire graph lock once before the inner loop, collect all memory IDs,
57            // then drop the lock before batch-fetching memories from storage.
58            let memory_ids: Vec<String> = {
59                let graph = self.lock_graph()?;
60                cluster
61                    .iter()
62                    .filter_map(|node_id| {
63                        graph
64                            .get_node(node_id)
65                            .ok()
66                            .flatten()
67                            .and_then(|node| node.memory_id.clone())
68                    })
69                    .collect()
70            };
71
72            for mid in &memory_ids {
73                if let Ok(Some(mem)) = self.storage.get_memory_no_touch(mid) {
74                    contents.push(mem.content.clone());
75                    source_ids.push(mid.clone());
76                    all_tags.extend(mem.tags.clone());
77                }
78            }
79
80            if contents.len() < 2 {
81                continue;
82            }
83
84            let combined = contents.join("\n---\n");
85            let summary = match provider.compress(&combined, "consolidate_summarize", None) {
86                Some(s) => s,
87                None => continue,
88            };
89
90            all_tags.sort();
91            all_tags.dedup();
92
93            let mut mem = MemoryNode::new(summary, MemoryType::Insight);
94            let new_id = mem.id.clone();
95            mem.importance = 0.7;
96            mem.tags = all_tags;
97
98            // M4: Use persist_memory_no_save to skip per-memory index save,
99            // then call save_index() once after the entire loop.
100            if self.persist_memory_no_save(&mem).is_err() {
101                tracing::warn!("Failed to persist summary memory: {new_id}");
102                continue;
103            }
104
105            let now = chrono::Utc::now();
106            if let Ok(mut graph) = self.lock_graph() {
107                for sid in &source_ids {
108                    let edge = Edge {
109                        id: format!("{new_id}-SUMMARIZES-{sid}"),
110                        src: new_id.clone(),
111                        dst: sid.clone(),
112                        relationship: RelationshipType::Summarizes,
113                        weight: 1.0,
114                        properties: HashMap::new(),
115                        created_at: now,
116                        valid_from: Some(now),
117                        valid_to: None,
118                    };
119                    if let Err(e) = self.storage.insert_graph_edge(&edge) {
120                        tracing::warn!("Failed to persist SUMMARIZES edge: {e}");
121                    }
122                    let _ = graph.add_edge(edge);
123                }
124            }
125
126            summarized_count += 1;
127            created_ids.push(new_id);
128        }
129
130        // M4: Save vector index once after all summaries are persisted
131        if summarized_count > 0 {
132            self.save_index();
133        }
134
135        if let Err(e) = self
136            .storage
137            .insert_consolidation_log("summarize", summarized_count)
138        {
139            tracing::warn!("Failed to log summarize consolidation: {e}");
140        }
141
142        Ok(ConsolidationResult {
143            cycle: "summarize".to_string(),
144            affected: summarized_count,
145            details: json!({
146                "clusters_found": large_clusters.len(),
147                "summarized": summarized_count,
148                "created_ids": created_ids,
149                "min_cluster_size": min_cluster_size,
150            }),
151        })
152    }
153}