codemem_engine/consolidation/
creative.rs1use super::ConsolidationResult;
2use crate::CodememEngine;
3use codemem_core::{CodememError, Edge, GraphNode, NodeKind, RelationshipType};
4use serde_json::json;
5use std::collections::{HashMap, HashSet};
6use std::iter;
7
8impl CodememEngine {
9 pub fn consolidate_creative(&self) -> Result<ConsolidationResult, CodememError> {
17 let parsed = self.storage.list_memories_for_creative()?;
19
20 let ids_refs: Vec<&str> = parsed.iter().map(|(id, _, _)| id.as_str()).collect();
21 let memories = self.storage.get_memories_batch(&ids_refs)?;
22
23 let type_map: HashMap<String, String> = memories
25 .iter()
26 .map(|m| (m.id.clone(), m.memory_type.to_string()))
27 .collect();
28
29 let all_edges = self.storage.all_graph_edges()?;
32 let existing_edges: HashSet<String> = all_edges
33 .iter()
34 .filter(|e| {
35 e.relationship == RelationshipType::SharesTheme
36 || e.relationship == RelationshipType::RelatesTo
37 })
38 .flat_map(|e| {
39 iter::once(format!("{}\0{}", e.src, e.dst))
41 .chain(iter::once(format!("{}\0{}", e.dst, e.src)))
42 })
43 .collect();
44
45 let memory_ids: Vec<String> = type_map.keys().cloned().collect();
50
51 let now = chrono::Utc::now();
52 let mut new_connections = 0usize;
53
54 let mut pending_nodes: Vec<GraphNode> = Vec::new();
56 let mut pending_edges: Vec<Edge> = Vec::new();
57 let mut queued_node_ids: HashSet<String> = HashSet::new();
59
60 let mut graph = self.lock_graph()?;
62 let mut vector = self.lock_vector()?;
63
64 for (iter_idx, id) in memory_ids.iter().enumerate() {
67 if iter_idx > 0 && iter_idx % 50 == 0 {
69 drop(vector);
71 drop(graph);
72 graph = self.lock_graph()?;
73 vector = self.lock_vector()?;
74 }
75
76 let my_type = match type_map.get(id) {
77 Some(t) => t,
78 None => continue,
79 };
80
81 let embedding = match self.storage.get_embedding(id) {
83 Ok(Some(emb)) => emb,
84 _ => continue,
85 };
86
87 let neighbors = vector.search(&embedding, 7).unwrap_or_default();
88
89 for (neighbor_id, sim) in &neighbors {
90 if neighbor_id == id {
91 continue;
92 }
93
94 let neighbor_type = match type_map.get(neighbor_id) {
95 Some(t) => t,
96 None => continue,
97 };
98
99 let edge_key = format!("{id}\0{neighbor_id}");
101 if existing_edges.contains(&edge_key) {
102 continue;
103 }
104
105 let similarity = *sim as f64;
106
107 let threshold = if my_type == neighbor_type { 0.5 } else { 0.35 };
111 if similarity < threshold {
112 continue;
113 }
114
115 for nid in [id, neighbor_id] {
117 if queued_node_ids.contains(nid) {
118 continue; }
120 if graph.get_node(nid).ok().flatten().is_some() {
121 continue; }
123 let label = memories
124 .iter()
125 .find(|m| m.id == *nid)
126 .map(|m| crate::scoring::truncate_content(&m.content, 80))
127 .unwrap_or_else(|| nid.clone());
128 let node = GraphNode {
129 id: nid.clone(),
130 kind: NodeKind::Memory,
131 label,
132 payload: HashMap::new(),
133 centrality: 0.0,
134 memory_id: Some(nid.clone()),
135 namespace: None,
136 valid_from: None,
137 valid_to: None,
138 };
139 pending_nodes.push(node);
140 queued_node_ids.insert(nid.clone());
141 }
142
143 let edge_id = format!("{id}-SHARES_THEME-{neighbor_id}");
144 let edge = Edge {
145 id: edge_id,
146 src: id.clone(),
147 dst: neighbor_id.clone(),
148 relationship: RelationshipType::SharesTheme,
149 weight: similarity,
150 properties: HashMap::new(),
151 created_at: now,
152 valid_from: Some(now),
153 valid_to: None,
154 };
155
156 pending_edges.push(edge);
157 new_connections += 1;
158 }
159 }
160
161 if !pending_nodes.is_empty() {
163 if let Err(e) = self.storage.insert_graph_nodes_batch(&pending_nodes) {
164 tracing::warn!(
165 "Failed to batch-insert {} graph nodes during creative consolidation: {e}",
166 pending_nodes.len()
167 );
168 }
169 }
170 if !pending_edges.is_empty() {
171 if let Err(e) = self.storage.insert_graph_edges_batch(&pending_edges) {
172 tracing::warn!(
173 "Failed to batch-insert {} graph edges during creative consolidation: {e}",
174 pending_edges.len()
175 );
176 }
177 }
178
179 for node in pending_nodes {
181 let _ = graph.add_node(node);
182 }
183 for edge in pending_edges {
184 let _ = graph.add_edge(edge);
185 }
186
187 drop(vector);
188 drop(graph);
189
190 if let Err(e) = self
191 .storage
192 .insert_consolidation_log("creative", new_connections)
193 {
194 tracing::warn!("Failed to log creative consolidation: {e}");
195 }
196
197 Ok(ConsolidationResult {
198 cycle: "creative".to_string(),
199 affected: new_connections,
200 details: json!({
201 "algorithm": "vector_knn",
202 }),
203 })
204 }
205}