codemem_engine/consolidation/
creative.rs1use super::ConsolidationResult;
2use crate::CodememEngine;
3use codemem_core::{
4 CodememError, Edge, GraphBackend, GraphNode, NodeKind, RelationshipType, VectorBackend,
5};
6use serde_json::json;
7use std::collections::{HashMap, HashSet};
8use std::iter;
9
10impl CodememEngine {
11 pub fn consolidate_creative(&self) -> Result<ConsolidationResult, CodememError> {
19 let parsed = self.storage.list_memories_for_creative()?;
21
22 let ids_refs: Vec<&str> = parsed.iter().map(|(id, _, _)| id.as_str()).collect();
23 let memories = self.storage.get_memories_batch(&ids_refs)?;
24
25 let type_map: HashMap<String, String> = memories
27 .iter()
28 .map(|m| (m.id.clone(), m.memory_type.to_string()))
29 .collect();
30
31 let all_edges = self.storage.all_graph_edges()?;
34 let existing_edges: HashSet<String> = all_edges
35 .iter()
36 .filter(|e| {
37 e.relationship == RelationshipType::SharesTheme
38 || e.relationship == RelationshipType::RelatesTo
39 })
40 .flat_map(|e| {
41 iter::once(format!("{}\0{}", e.src, e.dst))
43 .chain(iter::once(format!("{}\0{}", e.dst, e.src)))
44 })
45 .collect();
46
47 let memory_ids: Vec<String> = type_map.keys().cloned().collect();
52
53 let now = chrono::Utc::now();
54 let mut new_connections = 0usize;
55
56 let mut pending_nodes: Vec<GraphNode> = Vec::new();
58 let mut pending_edges: Vec<Edge> = Vec::new();
59 let mut queued_node_ids: HashSet<String> = HashSet::new();
61
62 let mut graph = self.lock_graph()?;
64 let mut vector = self.lock_vector()?;
65
66 for (iter_idx, id) in memory_ids.iter().enumerate() {
69 if iter_idx > 0 && iter_idx % 50 == 0 {
71 drop(vector);
73 drop(graph);
74 graph = self.lock_graph()?;
75 vector = self.lock_vector()?;
76 }
77
78 let my_type = match type_map.get(id) {
79 Some(t) => t,
80 None => continue,
81 };
82
83 let embedding = match self.storage.get_embedding(id) {
85 Ok(Some(emb)) => emb,
86 _ => continue,
87 };
88
89 let neighbors = vector.search(&embedding, 7).unwrap_or_default();
90
91 for (neighbor_id, sim) in &neighbors {
92 if neighbor_id == id {
93 continue;
94 }
95
96 let neighbor_type = match type_map.get(neighbor_id) {
97 Some(t) => t,
98 None => continue,
99 };
100
101 let edge_key = format!("{id}\0{neighbor_id}");
103 if existing_edges.contains(&edge_key) {
104 continue;
105 }
106
107 let similarity = *sim as f64;
108
109 let threshold = if my_type == neighbor_type { 0.5 } else { 0.35 };
113 if similarity < threshold {
114 continue;
115 }
116
117 for nid in [id, neighbor_id] {
119 if queued_node_ids.contains(nid) {
120 continue; }
122 if graph.get_node(nid).ok().flatten().is_some() {
123 continue; }
125 let label = memories
126 .iter()
127 .find(|m| m.id == *nid)
128 .map(|m| crate::scoring::truncate_content(&m.content, 80))
129 .unwrap_or_else(|| nid.clone());
130 let node = GraphNode {
131 id: nid.clone(),
132 kind: NodeKind::Memory,
133 label,
134 payload: HashMap::new(),
135 centrality: 0.0,
136 memory_id: Some(nid.clone()),
137 namespace: None,
138 };
139 pending_nodes.push(node);
140 queued_node_ids.insert(nid.clone());
141 }
142
143 let edge_id = format!("{id}-SHARES_THEME-{neighbor_id}");
144 let edge = Edge {
145 id: edge_id,
146 src: id.clone(),
147 dst: neighbor_id.clone(),
148 relationship: RelationshipType::SharesTheme,
149 weight: similarity,
150 properties: HashMap::new(),
151 created_at: now,
152 valid_from: Some(now),
153 valid_to: None,
154 };
155
156 pending_edges.push(edge);
157 new_connections += 1;
158 }
159 }
160
161 if !pending_nodes.is_empty() {
163 if let Err(e) = self.storage.insert_graph_nodes_batch(&pending_nodes) {
164 tracing::warn!(
165 "Failed to batch-insert {} graph nodes during creative consolidation: {e}",
166 pending_nodes.len()
167 );
168 }
169 }
170 if !pending_edges.is_empty() {
171 if let Err(e) = self.storage.insert_graph_edges_batch(&pending_edges) {
172 tracing::warn!(
173 "Failed to batch-insert {} graph edges during creative consolidation: {e}",
174 pending_edges.len()
175 );
176 }
177 }
178
179 for node in pending_nodes {
181 let _ = graph.add_node(node);
182 }
183 for edge in pending_edges {
184 let _ = graph.add_edge(edge);
185 }
186
187 drop(vector);
188 drop(graph);
189
190 if let Err(e) = self
191 .storage
192 .insert_consolidation_log("creative", new_connections)
193 {
194 tracing::warn!("Failed to log creative consolidation: {e}");
195 }
196
197 Ok(ConsolidationResult {
198 cycle: "creative".to_string(),
199 affected: new_connections,
200 details: json!({
201 "algorithm": "vector_knn",
202 }),
203 })
204 }
205}