Skip to main content

codemem_engine/consolidation/
creative.rs

1use super::ConsolidationResult;
2use crate::CodememEngine;
3use codemem_core::{CodememError, Edge, GraphNode, NodeKind, RelationshipType};
4use serde_json::json;
5use std::collections::{HashMap, HashSet};
6use std::iter;
7
8impl CodememEngine {
9    /// Consolidate creative: O(n log n) semantic creative consolidation.
10    /// Uses vector KNN search per memory to find cross-type neighbors and creates
11    /// SHARES_THEME edges.
12    ///
13    /// Memory usage: O(K*768) per query instead of O(N*768) for all embeddings,
14    /// where K is the number of nearest neighbors searched (7). Only the memory
15    /// metadata (IDs + types) is kept in RAM, not the full embedding vectors.
16    pub fn consolidate_creative(&self) -> Result<ConsolidationResult, CodememError> {
17        // Load all memories with their types
18        let parsed = self.storage.list_memories_for_creative()?;
19
20        let ids_refs: Vec<&str> = parsed.iter().map(|(id, _, _)| id.as_str()).collect();
21        let memories = self.storage.get_memories_batch(&ids_refs)?;
22
23        // Build type lookup
24        let type_map: HashMap<String, String> = memories
25            .iter()
26            .map(|m| (m.id.clone(), m.memory_type.to_string()))
27            .collect();
28
29        // Load existing SHARES_THEME edges to avoid duplicates.
30        // Use a combined key string to allow borrowed lookups without cloning per check (L19).
31        let all_edges = self.storage.all_graph_edges()?;
32        let existing_edges: HashSet<String> = all_edges
33            .iter()
34            .filter(|e| {
35                e.relationship == RelationshipType::SharesTheme
36                    || e.relationship == RelationshipType::RelatesTo
37            })
38            .flat_map(|e| {
39                // H6: Use iter::once chains instead of vec![] to avoid per-edge heap alloc
40                iter::once(format!("{}\0{}", e.src, e.dst))
41                    .chain(iter::once(format!("{}\0{}", e.dst, e.src)))
42            })
43            .collect();
44
45        // X2: Instead of loading ALL embeddings into a HashMap (O(N*768) memory),
46        // we iterate over memory IDs and fetch each embedding individually from
47        // storage, then use vector KNN to find neighbors. This uses O(K*768)
48        // memory per query where K=7.
49        let memory_ids: Vec<String> = type_map.keys().cloned().collect();
50
51        let now = chrono::Utc::now();
52        let mut new_connections = 0usize;
53
54        // Collect nodes and edges to batch-insert after the loop
55        let mut pending_nodes: Vec<GraphNode> = Vec::new();
56        let mut pending_edges: Vec<Edge> = Vec::new();
57        // Track which node IDs we've already queued for insertion to avoid duplicates
58        let mut queued_node_ids: HashSet<String> = HashSet::new();
59
60        // C1: Lock ordering: graph first, then vector
61        let mut graph = self.lock_graph()?;
62        let mut vector = self.lock_vector()?;
63
64        // H1: For each memory, load its embedding on demand and find 6 nearest neighbors.
65        // Drop and re-acquire graph+vector locks every 50 iterations to yield to other threads.
66        for (iter_idx, id) in memory_ids.iter().enumerate() {
67            // H1: Yield locks every 50 iterations to avoid long lock holds
68            if iter_idx > 0 && iter_idx % 50 == 0 {
69                // Must drop in reverse acquisition order
70                drop(vector);
71                drop(graph);
72                graph = self.lock_graph()?;
73                vector = self.lock_vector()?;
74            }
75
76            let my_type = match type_map.get(id) {
77                Some(t) => t,
78                None => continue,
79            };
80
81            // Load embedding for this single memory from storage (not kept in RAM)
82            let embedding = match self.storage.get_embedding(id) {
83                Ok(Some(emb)) => emb,
84                _ => continue,
85            };
86
87            let neighbors = vector.search(&embedding, 7).unwrap_or_default();
88
89            for (neighbor_id, sim) in &neighbors {
90                if neighbor_id == id {
91                    continue;
92                }
93
94                let neighbor_type = match type_map.get(neighbor_id) {
95                    Some(t) => t,
96                    None => continue,
97                };
98
99                // L19: Use combined key to avoid cloning both ID strings per check
100                let edge_key = format!("{id}\0{neighbor_id}");
101                if existing_edges.contains(&edge_key) {
102                    continue;
103                }
104
105                let similarity = *sim as f64;
106
107                // Cross-type links at 0.35 threshold, same-type links at 0.5.
108                // Lower thresholds improve graph connectivity for conversational memories
109                // which tend to have moderate but meaningful cosine similarities.
110                let threshold = if my_type == neighbor_type { 0.5 } else { 0.35 };
111                if similarity < threshold {
112                    continue;
113                }
114
115                // M10: Ensure both nodes exist, using memory content as label when available
116                for nid in [id, neighbor_id] {
117                    if queued_node_ids.contains(nid) {
118                        continue; // Already queued for batch insert
119                    }
120                    if graph.get_node(nid).ok().flatten().is_some() {
121                        continue; // Node already exists, don't overwrite
122                    }
123                    let label = memories
124                        .iter()
125                        .find(|m| m.id == *nid)
126                        .map(|m| crate::scoring::truncate_content(&m.content, 80))
127                        .unwrap_or_else(|| nid.clone());
128                    let node = GraphNode {
129                        id: nid.clone(),
130                        kind: NodeKind::Memory,
131                        label,
132                        payload: HashMap::new(),
133                        centrality: 0.0,
134                        memory_id: Some(nid.clone()),
135                        namespace: None,
136                        valid_from: None,
137                        valid_to: None,
138                    };
139                    pending_nodes.push(node);
140                    queued_node_ids.insert(nid.clone());
141                }
142
143                let edge_id = format!("{id}-SHARES_THEME-{neighbor_id}");
144                let edge = Edge {
145                    id: edge_id,
146                    src: id.clone(),
147                    dst: neighbor_id.clone(),
148                    relationship: RelationshipType::SharesTheme,
149                    weight: similarity,
150                    properties: HashMap::new(),
151                    created_at: now,
152                    valid_from: Some(now),
153                    valid_to: None,
154                };
155
156                pending_edges.push(edge);
157                new_connections += 1;
158            }
159        }
160
161        // Batch-insert collected nodes and edges into storage
162        if !pending_nodes.is_empty() {
163            if let Err(e) = self.storage.insert_graph_nodes_batch(&pending_nodes) {
164                tracing::warn!(
165                    "Failed to batch-insert {} graph nodes during creative consolidation: {e}",
166                    pending_nodes.len()
167                );
168            }
169        }
170        if !pending_edges.is_empty() {
171            if let Err(e) = self.storage.insert_graph_edges_batch(&pending_edges) {
172                tracing::warn!(
173                    "Failed to batch-insert {} graph edges during creative consolidation: {e}",
174                    pending_edges.len()
175                );
176            }
177        }
178
179        // Add to in-memory graph
180        for node in pending_nodes {
181            let _ = graph.add_node(node);
182        }
183        for edge in pending_edges {
184            let _ = graph.add_edge(edge);
185        }
186
187        drop(vector);
188        drop(graph);
189
190        if let Err(e) = self
191            .storage
192            .insert_consolidation_log("creative", new_connections)
193        {
194            tracing::warn!("Failed to log creative consolidation: {e}");
195        }
196
197        Ok(ConsolidationResult {
198            cycle: "creative".to_string(),
199            affected: new_connections,
200            details: json!({
201                "algorithm": "vector_knn",
202            }),
203        })
204    }
205}