Skip to main content

codemem_engine/consolidation/
creative.rs

1use super::ConsolidationResult;
2use crate::CodememEngine;
3use codemem_core::{
4    CodememError, Edge, GraphBackend, GraphNode, NodeKind, RelationshipType, VectorBackend,
5};
6use serde_json::json;
7use std::collections::{HashMap, HashSet};
8use std::iter;
9
10impl CodememEngine {
11    /// Consolidate creative: O(n log n) semantic creative consolidation.
12    /// Uses vector KNN search per memory to find cross-type neighbors and creates
13    /// SHARES_THEME edges.
14    ///
15    /// Memory usage: O(K*768) per query instead of O(N*768) for all embeddings,
16    /// where K is the number of nearest neighbors searched (7). Only the memory
17    /// metadata (IDs + types) is kept in RAM, not the full embedding vectors.
18    pub fn consolidate_creative(&self) -> Result<ConsolidationResult, CodememError> {
19        // Load all memories with their types
20        let parsed = self.storage.list_memories_for_creative()?;
21
22        let ids_refs: Vec<&str> = parsed.iter().map(|(id, _, _)| id.as_str()).collect();
23        let memories = self.storage.get_memories_batch(&ids_refs)?;
24
25        // Build type lookup
26        let type_map: HashMap<String, String> = memories
27            .iter()
28            .map(|m| (m.id.clone(), m.memory_type.to_string()))
29            .collect();
30
31        // Load existing SHARES_THEME edges to avoid duplicates.
32        // Use a combined key string to allow borrowed lookups without cloning per check (L19).
33        let all_edges = self.storage.all_graph_edges()?;
34        let existing_edges: HashSet<String> = all_edges
35            .iter()
36            .filter(|e| {
37                e.relationship == RelationshipType::SharesTheme
38                    || e.relationship == RelationshipType::RelatesTo
39            })
40            .flat_map(|e| {
41                // H6: Use iter::once chains instead of vec![] to avoid per-edge heap alloc
42                iter::once(format!("{}\0{}", e.src, e.dst))
43                    .chain(iter::once(format!("{}\0{}", e.dst, e.src)))
44            })
45            .collect();
46
47        // X2: Instead of loading ALL embeddings into a HashMap (O(N*768) memory),
48        // we iterate over memory IDs and fetch each embedding individually from
49        // storage, then use vector KNN to find neighbors. This uses O(K*768)
50        // memory per query where K=7.
51        let memory_ids: Vec<String> = type_map.keys().cloned().collect();
52
53        let now = chrono::Utc::now();
54        let mut new_connections = 0usize;
55
56        // Collect nodes and edges to batch-insert after the loop
57        let mut pending_nodes: Vec<GraphNode> = Vec::new();
58        let mut pending_edges: Vec<Edge> = Vec::new();
59        // Track which node IDs we've already queued for insertion to avoid duplicates
60        let mut queued_node_ids: HashSet<String> = HashSet::new();
61
62        // C1: Lock ordering: graph first, then vector
63        let mut graph = self.lock_graph()?;
64        let mut vector = self.lock_vector()?;
65
66        // H1: For each memory, load its embedding on demand and find 6 nearest neighbors.
67        // Drop and re-acquire graph+vector locks every 50 iterations to yield to other threads.
68        for (iter_idx, id) in memory_ids.iter().enumerate() {
69            // H1: Yield locks every 50 iterations to avoid long lock holds
70            if iter_idx > 0 && iter_idx % 50 == 0 {
71                // Must drop in reverse acquisition order
72                drop(vector);
73                drop(graph);
74                graph = self.lock_graph()?;
75                vector = self.lock_vector()?;
76            }
77
78            let my_type = match type_map.get(id) {
79                Some(t) => t,
80                None => continue,
81            };
82
83            // Load embedding for this single memory from storage (not kept in RAM)
84            let embedding = match self.storage.get_embedding(id) {
85                Ok(Some(emb)) => emb,
86                _ => continue,
87            };
88
89            let neighbors = vector.search(&embedding, 7).unwrap_or_default();
90
91            for (neighbor_id, sim) in &neighbors {
92                if neighbor_id == id {
93                    continue;
94                }
95
96                let neighbor_type = match type_map.get(neighbor_id) {
97                    Some(t) => t,
98                    None => continue,
99                };
100
101                // L19: Use combined key to avoid cloning both ID strings per check
102                let edge_key = format!("{id}\0{neighbor_id}");
103                if existing_edges.contains(&edge_key) {
104                    continue;
105                }
106
107                let similarity = *sim as f64;
108
109                // Cross-type links at 0.35 threshold, same-type links at 0.5.
110                // Lower thresholds improve graph connectivity for conversational memories
111                // which tend to have moderate but meaningful cosine similarities.
112                let threshold = if my_type == neighbor_type { 0.5 } else { 0.35 };
113                if similarity < threshold {
114                    continue;
115                }
116
117                // M10: Ensure both nodes exist, using memory content as label when available
118                for nid in [id, neighbor_id] {
119                    if queued_node_ids.contains(nid) {
120                        continue; // Already queued for batch insert
121                    }
122                    if graph.get_node(nid).ok().flatten().is_some() {
123                        continue; // Node already exists, don't overwrite
124                    }
125                    let label = memories
126                        .iter()
127                        .find(|m| m.id == *nid)
128                        .map(|m| crate::scoring::truncate_content(&m.content, 80))
129                        .unwrap_or_else(|| nid.clone());
130                    let node = GraphNode {
131                        id: nid.clone(),
132                        kind: NodeKind::Memory,
133                        label,
134                        payload: HashMap::new(),
135                        centrality: 0.0,
136                        memory_id: Some(nid.clone()),
137                        namespace: None,
138                    };
139                    pending_nodes.push(node);
140                    queued_node_ids.insert(nid.clone());
141                }
142
143                let edge_id = format!("{id}-SHARES_THEME-{neighbor_id}");
144                let edge = Edge {
145                    id: edge_id,
146                    src: id.clone(),
147                    dst: neighbor_id.clone(),
148                    relationship: RelationshipType::SharesTheme,
149                    weight: similarity,
150                    properties: HashMap::new(),
151                    created_at: now,
152                    valid_from: Some(now),
153                    valid_to: None,
154                };
155
156                pending_edges.push(edge);
157                new_connections += 1;
158            }
159        }
160
161        // Batch-insert collected nodes and edges into storage
162        if !pending_nodes.is_empty() {
163            if let Err(e) = self.storage.insert_graph_nodes_batch(&pending_nodes) {
164                tracing::warn!(
165                    "Failed to batch-insert {} graph nodes during creative consolidation: {e}",
166                    pending_nodes.len()
167                );
168            }
169        }
170        if !pending_edges.is_empty() {
171            if let Err(e) = self.storage.insert_graph_edges_batch(&pending_edges) {
172                tracing::warn!(
173                    "Failed to batch-insert {} graph edges during creative consolidation: {e}",
174                    pending_edges.len()
175                );
176            }
177        }
178
179        // Add to in-memory graph
180        for node in pending_nodes {
181            let _ = graph.add_node(node);
182        }
183        for edge in pending_edges {
184            let _ = graph.add_edge(edge);
185        }
186
187        drop(vector);
188        drop(graph);
189
190        if let Err(e) = self
191            .storage
192            .insert_consolidation_log("creative", new_connections)
193        {
194            tracing::warn!("Failed to log creative consolidation: {e}");
195        }
196
197        Ok(ConsolidationResult {
198            cycle: "creative".to_string(),
199            affected: new_connections,
200            details: json!({
201                "algorithm": "vector_knn",
202            }),
203        })
204    }
205}