Skip to main content

recall_echo/graph/
dedup.rs

1//! LLM-powered entity deduplication — skip, create, or merge decisions.
2
3use super::error::GraphError;
4use super::llm::LlmProvider;
5use super::types::*;
6use super::GraphMemory;
7
8const DEDUP_SYSTEM_PROMPT: &str = r#"You are a deduplication system for a knowledge graph. Given a candidate entity and existing similar entities, decide:
9
101. "skip" — The candidate is a duplicate. It adds no new information.
112. "create" — The candidate is genuinely new despite surface similarity.
123. "merge" — The candidate adds new information to an existing entity. Specify which one.
13
14Return EXACTLY this JSON (no markdown fencing, no explanation):
15
16{
17  "decision": "skip" | "create" | "merge",
18  "target": "Name of existing entity to merge into (only if merge)",
19  "reason": "Brief explanation"
20}
21
22Rules:
23- Same entity with minor name variations (e.g., "ElevenLabs" vs "Eleven Labs"): merge
24- Same concept but genuinely different instances: create
25- Candidate adds meaningful new detail to an existing entity: merge
26- Candidate is less detailed than existing: skip
27- When in doubt between create and merge: prefer create (avoid data loss)"#;
28
29/// Resolved entity after dedup — either newly created or existing (merged/skipped).
30pub enum ResolvedEntity {
31    Created(Entity),
32    Merged(Entity),
33    Skipped,
34}
35
36/// Run the full dedup pipeline for one extracted entity.
37///
38/// 1. Vector search for similar entities
39/// 2. If none similar: CREATE directly
40/// 3. If similar found: ask LLM for skip/create/merge decision
41/// 4. For merge on immutable types: fall back to CREATE
42pub async fn resolve_entity(
43    gm: &GraphMemory,
44    llm: &dyn LlmProvider,
45    candidate: &ExtractedEntity,
46    session_id: &str,
47) -> Result<ResolvedEntity, GraphError> {
48    // Search for similar entities
49    let similar = gm.search(&candidate.abstract_text, 5).await?;
50
51    // Filter to meaningful similarity (> 0.7 blended score)
52    let relevant: Vec<_> = similar.iter().filter(|r| r.score > 0.7).collect();
53
54    if relevant.is_empty() {
55        // No similar entities — create directly
56        let entity = gm
57            .add_entity(NewEntity {
58                name: candidate.name.clone(),
59                entity_type: candidate.entity_type.clone(),
60                abstract_text: candidate.abstract_text.clone(),
61                overview: candidate.overview.clone(),
62                content: candidate.content.clone(),
63                attributes: candidate.attributes.clone(),
64                source: Some(session_id.to_string()),
65            })
66            .await?;
67        return Ok(ResolvedEntity::Created(entity));
68    }
69
70    // Ask LLM for dedup decision
71    let user_message = build_dedup_message(candidate, &relevant);
72    let response = llm
73        .complete(DEDUP_SYSTEM_PROMPT, &user_message, 300)
74        .await?;
75
76    let decision = parse_dedup_response(&response)?;
77
78    match decision {
79        DedupDecision::Skip => Ok(ResolvedEntity::Skipped),
80
81        DedupDecision::Create => {
82            let entity = gm
83                .add_entity(NewEntity {
84                    name: candidate.name.clone(),
85                    entity_type: candidate.entity_type.clone(),
86                    abstract_text: candidate.abstract_text.clone(),
87                    overview: candidate.overview.clone(),
88                    content: candidate.content.clone(),
89                    attributes: candidate.attributes.clone(),
90                    source: Some(session_id.to_string()),
91                })
92                .await?;
93            Ok(ResolvedEntity::Created(entity))
94        }
95
96        DedupDecision::Merge { target } => {
97            // Find the target entity
98            let target_entity = gm.get_entity(&target).await?;
99            let Some(target_entity) = target_entity else {
100                // Target not found — fall back to create
101                let entity = gm
102                    .add_entity(NewEntity {
103                        name: candidate.name.clone(),
104                        entity_type: candidate.entity_type.clone(),
105                        abstract_text: candidate.abstract_text.clone(),
106                        overview: candidate.overview.clone(),
107                        content: candidate.content.clone(),
108                        attributes: candidate.attributes.clone(),
109                        source: Some(session_id.to_string()),
110                    })
111                    .await?;
112                return Ok(ResolvedEntity::Created(entity));
113            };
114
115            // Check mutability
116            if !target_entity.mutable {
117                // Immutable — can't merge, create instead
118                let entity = gm
119                    .add_entity(NewEntity {
120                        name: candidate.name.clone(),
121                        entity_type: candidate.entity_type.clone(),
122                        abstract_text: candidate.abstract_text.clone(),
123                        overview: candidate.overview.clone(),
124                        content: candidate.content.clone(),
125                        attributes: candidate.attributes.clone(),
126                        source: Some(session_id.to_string()),
127                    })
128                    .await?;
129                return Ok(ResolvedEntity::Created(entity));
130            }
131
132            let merged = merge_entity(gm, &target_entity, candidate).await?;
133            Ok(ResolvedEntity::Merged(merged))
134        }
135    }
136}
137
138/// Merge candidate data into an existing entity.
139///
140/// Rules:
141/// - Abstract: use longer/more detailed version
142/// - Overview: concatenate if both exist
143/// - Content: append candidate content
144/// - Attributes: deep-merge (candidate wins on conflict)
145async fn merge_entity(
146    gm: &GraphMemory,
147    target: &Entity,
148    candidate: &ExtractedEntity,
149) -> Result<Entity, GraphError> {
150    let new_abstract = if candidate.abstract_text.len() > target.abstract_text.len() {
151        Some(candidate.abstract_text.clone())
152    } else {
153        None
154    };
155
156    let new_overview = candidate.overview.as_ref().map(|co| {
157        if target.overview.is_empty() {
158            co.clone()
159        } else {
160            format!("{}\n\n{}", target.overview, co)
161        }
162    });
163
164    let new_content = candidate.content.as_ref().map(|cc| match &target.content {
165        Some(tc) => format!("{}\n\n{}", tc, cc),
166        None => cc.clone(),
167    });
168
169    let new_attributes = candidate
170        .attributes
171        .as_ref()
172        .map(|ca| match &target.attributes {
173            Some(ta) => merge_json_objects(ta, ca),
174            None => ca.clone(),
175        });
176
177    let updates = EntityUpdate {
178        abstract_text: new_abstract,
179        overview: new_overview,
180        content: new_content,
181        attributes: new_attributes,
182    };
183
184    gm.update_entity(&target.id_string(), updates).await
185}
186
187fn build_dedup_message(candidate: &ExtractedEntity, similar: &[&SearchResult]) -> String {
188    let mut msg = format!(
189        "CANDIDATE:\n  Name: {}\n  Type: {}\n  Abstract: {}\n\nEXISTING SIMILAR ENTITIES:\n",
190        candidate.name, candidate.entity_type, candidate.abstract_text
191    );
192    for (i, r) in similar.iter().enumerate() {
193        msg.push_str(&format!(
194            "\n{}. Name: {} (score: {:.3})\n   Type: {}\n   Abstract: {}\n",
195            i + 1,
196            r.entity.name,
197            r.score,
198            r.entity.entity_type,
199            r.entity.abstract_text
200        ));
201    }
202    msg
203}
204
205/// Parse the LLM's dedup decision from JSON.
206pub fn parse_dedup_response(text: &str) -> Result<DedupDecision, GraphError> {
207    let cleaned = strip_markdown_fencing(text);
208
209    let v: serde_json::Value = serde_json::from_str(&cleaned).map_err(|e| {
210        // Try extracting JSON from surrounding text
211        if let Some(json_str) = extract_json_object(&cleaned) {
212            if let Ok(v) = serde_json::from_str::<serde_json::Value>(json_str) {
213                return parse_decision_value(&v)
214                    .err()
215                    .unwrap_or_else(|| GraphError::Parse(e.to_string()));
216            }
217        }
218        GraphError::Parse(format!("dedup response not valid JSON: {}", e))
219    })?;
220
221    parse_decision_value(&v)
222}
223
224fn parse_decision_value(v: &serde_json::Value) -> Result<DedupDecision, GraphError> {
225    let decision = v
226        .get("decision")
227        .and_then(|d| d.as_str())
228        .ok_or_else(|| GraphError::Parse("missing 'decision' field".into()))?;
229
230    match decision {
231        "skip" => Ok(DedupDecision::Skip),
232        "create" => Ok(DedupDecision::Create),
233        "merge" => {
234            let target = v
235                .get("target")
236                .and_then(|t| t.as_str())
237                .ok_or_else(|| GraphError::Parse("merge decision missing 'target' field".into()))?;
238            Ok(DedupDecision::Merge {
239                target: target.to_string(),
240            })
241        }
242        other => Err(GraphError::Parse(format!("unknown decision: {}", other))),
243    }
244}
245
246fn strip_markdown_fencing(text: &str) -> String {
247    let trimmed = text.trim();
248    let stripped = trimmed
249        .strip_prefix("```json")
250        .or(trimmed.strip_prefix("```"))
251        .unwrap_or(trimmed);
252    let stripped = stripped.strip_suffix("```").unwrap_or(stripped);
253    stripped.trim().to_string()
254}
255
256fn extract_json_object(text: &str) -> Option<&str> {
257    let start = text.find('{')?;
258    let mut depth = 0;
259    let bytes = text.as_bytes();
260    for (i, &b) in bytes[start..].iter().enumerate() {
261        match b {
262            b'{' => depth += 1,
263            b'}' => {
264                depth -= 1;
265                if depth == 0 {
266                    return Some(&text[start..start + i + 1]);
267                }
268            }
269            _ => {}
270        }
271    }
272    None
273}
274
275fn merge_json_objects(base: &serde_json::Value, overlay: &serde_json::Value) -> serde_json::Value {
276    match (base, overlay) {
277        (serde_json::Value::Object(b), serde_json::Value::Object(o)) => {
278            let mut merged = b.clone();
279            for (k, v) in o {
280                merged.insert(k.clone(), v.clone());
281            }
282            serde_json::Value::Object(merged)
283        }
284        _ => overlay.clone(),
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291
292    #[test]
293    fn parse_skip_decision() {
294        let json = r#"{"decision": "skip", "reason": "duplicate"}"#;
295        let decision = parse_dedup_response(json).unwrap();
296        assert_eq!(decision, DedupDecision::Skip);
297    }
298
299    #[test]
300    fn parse_create_decision() {
301        let json = r#"{"decision": "create", "reason": "genuinely new"}"#;
302        let decision = parse_dedup_response(json).unwrap();
303        assert_eq!(decision, DedupDecision::Create);
304    }
305
306    #[test]
307    fn parse_merge_decision() {
308        let json = r#"{"decision": "merge", "target": "Rust", "reason": "same entity"}"#;
309        let decision = parse_dedup_response(json).unwrap();
310        assert_eq!(
311            decision,
312            DedupDecision::Merge {
313                target: "Rust".into()
314            }
315        );
316    }
317
318    #[test]
319    fn parse_with_fencing() {
320        let json = "```json\n{\"decision\": \"skip\", \"reason\": \"dup\"}\n```";
321        let decision = parse_dedup_response(json).unwrap();
322        assert_eq!(decision, DedupDecision::Skip);
323    }
324
325    #[test]
326    fn merge_json_objects_test() {
327        let base = serde_json::json!({"a": 1, "b": 2});
328        let overlay = serde_json::json!({"b": 3, "c": 4});
329        let merged = merge_json_objects(&base, &overlay);
330        assert_eq!(merged, serde_json::json!({"a": 1, "b": 3, "c": 4}));
331    }
332}