1use super::error::GraphError;
4use super::llm::LlmProvider;
5use super::types::*;
6use super::GraphMemory;
7
8const DEDUP_SYSTEM_PROMPT: &str = r#"You are a deduplication system for a knowledge graph. Given a candidate entity and existing similar entities, decide:
9
101. "skip" — The candidate is a duplicate. It adds no new information.
112. "create" — The candidate is genuinely new despite surface similarity.
123. "merge" — The candidate adds new information to an existing entity. Specify which one.
13
14Return EXACTLY this JSON (no markdown fencing, no explanation):
15
16{
17 "decision": "skip" | "create" | "merge",
18 "target": "Name of existing entity to merge into (only if merge)",
19 "reason": "Brief explanation"
20}
21
22Rules:
23- Same entity with minor name variations (e.g., "ElevenLabs" vs "Eleven Labs"): merge
24- Same concept but genuinely different instances: create
25- Candidate adds meaningful new detail to an existing entity: merge
26- Candidate is less detailed than existing: skip
27- When in doubt between create and merge: prefer create (avoid data loss)"#;
28
29pub enum ResolvedEntity {
31 Created(Entity),
32 Merged(Entity),
33 Skipped,
34}
35
36pub async fn resolve_entity(
43 gm: &GraphMemory,
44 llm: &dyn LlmProvider,
45 candidate: &ExtractedEntity,
46 session_id: &str,
47) -> Result<ResolvedEntity, GraphError> {
48 let similar = gm.search(&candidate.abstract_text, 5).await?;
50
51 let relevant: Vec<_> = similar.iter().filter(|r| r.score > 0.7).collect();
53
54 if relevant.is_empty() {
55 let entity = gm
57 .add_entity(NewEntity {
58 name: candidate.name.clone(),
59 entity_type: candidate.entity_type.clone(),
60 abstract_text: candidate.abstract_text.clone(),
61 overview: candidate.overview.clone(),
62 content: candidate.content.clone(),
63 attributes: candidate.attributes.clone(),
64 source: Some(session_id.to_string()),
65 })
66 .await?;
67 return Ok(ResolvedEntity::Created(entity));
68 }
69
70 let user_message = build_dedup_message(candidate, &relevant);
72 let response = llm
73 .complete(DEDUP_SYSTEM_PROMPT, &user_message, 300)
74 .await?;
75
76 let decision = parse_dedup_response(&response)?;
77
78 match decision {
79 DedupDecision::Skip => Ok(ResolvedEntity::Skipped),
80
81 DedupDecision::Create => {
82 let entity = gm
83 .add_entity(NewEntity {
84 name: candidate.name.clone(),
85 entity_type: candidate.entity_type.clone(),
86 abstract_text: candidate.abstract_text.clone(),
87 overview: candidate.overview.clone(),
88 content: candidate.content.clone(),
89 attributes: candidate.attributes.clone(),
90 source: Some(session_id.to_string()),
91 })
92 .await?;
93 Ok(ResolvedEntity::Created(entity))
94 }
95
96 DedupDecision::Merge { target } => {
97 let target_entity = gm.get_entity(&target).await?;
99 let Some(target_entity) = target_entity else {
100 let entity = gm
102 .add_entity(NewEntity {
103 name: candidate.name.clone(),
104 entity_type: candidate.entity_type.clone(),
105 abstract_text: candidate.abstract_text.clone(),
106 overview: candidate.overview.clone(),
107 content: candidate.content.clone(),
108 attributes: candidate.attributes.clone(),
109 source: Some(session_id.to_string()),
110 })
111 .await?;
112 return Ok(ResolvedEntity::Created(entity));
113 };
114
115 if !target_entity.mutable {
117 let entity = gm
119 .add_entity(NewEntity {
120 name: candidate.name.clone(),
121 entity_type: candidate.entity_type.clone(),
122 abstract_text: candidate.abstract_text.clone(),
123 overview: candidate.overview.clone(),
124 content: candidate.content.clone(),
125 attributes: candidate.attributes.clone(),
126 source: Some(session_id.to_string()),
127 })
128 .await?;
129 return Ok(ResolvedEntity::Created(entity));
130 }
131
132 let merged = merge_entity(gm, &target_entity, candidate).await?;
133 Ok(ResolvedEntity::Merged(merged))
134 }
135 }
136}
137
138async fn merge_entity(
146 gm: &GraphMemory,
147 target: &Entity,
148 candidate: &ExtractedEntity,
149) -> Result<Entity, GraphError> {
150 let new_abstract = if candidate.abstract_text.len() > target.abstract_text.len() {
151 Some(candidate.abstract_text.clone())
152 } else {
153 None
154 };
155
156 let new_overview = candidate.overview.as_ref().map(|co| {
157 if target.overview.is_empty() {
158 co.clone()
159 } else {
160 format!("{}\n\n{}", target.overview, co)
161 }
162 });
163
164 let new_content = candidate.content.as_ref().map(|cc| match &target.content {
165 Some(tc) => format!("{}\n\n{}", tc, cc),
166 None => cc.clone(),
167 });
168
169 let new_attributes = candidate
170 .attributes
171 .as_ref()
172 .map(|ca| match &target.attributes {
173 Some(ta) => merge_json_objects(ta, ca),
174 None => ca.clone(),
175 });
176
177 let updates = EntityUpdate {
178 abstract_text: new_abstract,
179 overview: new_overview,
180 content: new_content,
181 attributes: new_attributes,
182 };
183
184 gm.update_entity(&target.id_string(), updates).await
185}
186
187fn build_dedup_message(candidate: &ExtractedEntity, similar: &[&SearchResult]) -> String {
188 let mut msg = format!(
189 "CANDIDATE:\n Name: {}\n Type: {}\n Abstract: {}\n\nEXISTING SIMILAR ENTITIES:\n",
190 candidate.name, candidate.entity_type, candidate.abstract_text
191 );
192 for (i, r) in similar.iter().enumerate() {
193 msg.push_str(&format!(
194 "\n{}. Name: {} (score: {:.3})\n Type: {}\n Abstract: {}\n",
195 i + 1,
196 r.entity.name,
197 r.score,
198 r.entity.entity_type,
199 r.entity.abstract_text
200 ));
201 }
202 msg
203}
204
205pub fn parse_dedup_response(text: &str) -> Result<DedupDecision, GraphError> {
207 let cleaned = strip_markdown_fencing(text);
208
209 let v: serde_json::Value = serde_json::from_str(&cleaned).map_err(|e| {
210 if let Some(json_str) = extract_json_object(&cleaned) {
212 if let Ok(v) = serde_json::from_str::<serde_json::Value>(json_str) {
213 return parse_decision_value(&v)
214 .err()
215 .unwrap_or_else(|| GraphError::Parse(e.to_string()));
216 }
217 }
218 GraphError::Parse(format!("dedup response not valid JSON: {}", e))
219 })?;
220
221 parse_decision_value(&v)
222}
223
224fn parse_decision_value(v: &serde_json::Value) -> Result<DedupDecision, GraphError> {
225 let decision = v
226 .get("decision")
227 .and_then(|d| d.as_str())
228 .ok_or_else(|| GraphError::Parse("missing 'decision' field".into()))?;
229
230 match decision {
231 "skip" => Ok(DedupDecision::Skip),
232 "create" => Ok(DedupDecision::Create),
233 "merge" => {
234 let target = v
235 .get("target")
236 .and_then(|t| t.as_str())
237 .ok_or_else(|| GraphError::Parse("merge decision missing 'target' field".into()))?;
238 Ok(DedupDecision::Merge {
239 target: target.to_string(),
240 })
241 }
242 other => Err(GraphError::Parse(format!("unknown decision: {}", other))),
243 }
244}
245
246fn strip_markdown_fencing(text: &str) -> String {
247 let trimmed = text.trim();
248 let stripped = trimmed
249 .strip_prefix("```json")
250 .or(trimmed.strip_prefix("```"))
251 .unwrap_or(trimmed);
252 let stripped = stripped.strip_suffix("```").unwrap_or(stripped);
253 stripped.trim().to_string()
254}
255
256fn extract_json_object(text: &str) -> Option<&str> {
257 let start = text.find('{')?;
258 let mut depth = 0;
259 let bytes = text.as_bytes();
260 for (i, &b) in bytes[start..].iter().enumerate() {
261 match b {
262 b'{' => depth += 1,
263 b'}' => {
264 depth -= 1;
265 if depth == 0 {
266 return Some(&text[start..start + i + 1]);
267 }
268 }
269 _ => {}
270 }
271 }
272 None
273}
274
275fn merge_json_objects(base: &serde_json::Value, overlay: &serde_json::Value) -> serde_json::Value {
276 match (base, overlay) {
277 (serde_json::Value::Object(b), serde_json::Value::Object(o)) => {
278 let mut merged = b.clone();
279 for (k, v) in o {
280 merged.insert(k.clone(), v.clone());
281 }
282 serde_json::Value::Object(merged)
283 }
284 _ => overlay.clone(),
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291
292 #[test]
293 fn parse_skip_decision() {
294 let json = r#"{"decision": "skip", "reason": "duplicate"}"#;
295 let decision = parse_dedup_response(json).unwrap();
296 assert_eq!(decision, DedupDecision::Skip);
297 }
298
299 #[test]
300 fn parse_create_decision() {
301 let json = r#"{"decision": "create", "reason": "genuinely new"}"#;
302 let decision = parse_dedup_response(json).unwrap();
303 assert_eq!(decision, DedupDecision::Create);
304 }
305
306 #[test]
307 fn parse_merge_decision() {
308 let json = r#"{"decision": "merge", "target": "Rust", "reason": "same entity"}"#;
309 let decision = parse_dedup_response(json).unwrap();
310 assert_eq!(
311 decision,
312 DedupDecision::Merge {
313 target: "Rust".into()
314 }
315 );
316 }
317
318 #[test]
319 fn parse_with_fencing() {
320 let json = "```json\n{\"decision\": \"skip\", \"reason\": \"dup\"}\n```";
321 let decision = parse_dedup_response(json).unwrap();
322 assert_eq!(decision, DedupDecision::Skip);
323 }
324
325 #[test]
326 fn merge_json_objects_test() {
327 let base = serde_json::json!({"a": 1, "b": 2});
328 let overlay = serde_json::json!({"b": 3, "c": 4});
329 let merged = merge_json_objects(&base, &overlay);
330 assert_eq!(merged, serde_json::json!({"a": 1, "b": 3, "c": 4}));
331 }
332}