Skip to main content

mnemo_core/query/
conflict.rs

1use serde::{Deserialize, Serialize};
2use uuid::Uuid;
3
4use crate::error::Result;
5use crate::model::memory::{ConsolidationState, MemoryRecord, MemoryType, SourceType};
6use crate::query::MnemoEngine;
7use crate::storage::MemoryFilter;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct ConflictDetectionResult {
11    pub conflicts: Vec<ConflictPair>,
12}
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct ConflictPair {
16    pub memory_a: Uuid,
17    pub memory_b: Uuid,
18    pub similarity: f32,
19    pub reason: String,
20}
21
22#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
23#[serde(rename_all = "snake_case")]
24pub enum ResolutionStrategy {
25    KeepNewest,
26    KeepHighestImportance,
27    MergeIntoSemantic,
28    Manual,
29    EvidenceWeighted,
30}
31
32/// Scoring components for evidence-weighted conflict resolution.
33#[derive(Debug, Clone, Serialize)]
34pub struct ConflictEvidence {
35    pub source_reliability: f32,
36    pub recency_score: f32,
37    pub usage_score: f32,
38    pub importance_score: f32,
39    pub similarity_bonus: f32,
40    pub composite_weight: f32,
41}
42
43/// Returns a reliability score for each source type.
44/// Higher values = more trustworthy.
45pub fn source_reliability(st: SourceType) -> f32 {
46    match st {
47        SourceType::ToolOutput => 0.9,
48        SourceType::Human | SourceType::UserInput => 0.8,
49        SourceType::System => 0.75,
50        SourceType::ModelResponse => 0.7,
51        SourceType::Agent => 0.6,
52        SourceType::Consolidation => 0.5,
53        SourceType::Retrieval => 0.4,
54        SourceType::Import => 0.3,
55    }
56}
57
58fn compute_evidence(record: &MemoryRecord, max_access: u64, similarity: f32) -> ConflictEvidence {
59    let src_rel = source_reliability(record.source_type);
60    let recency = crate::query::retrieval::recency_score(&record.created_at, 168.0);
61    let usage = if max_access > 0 {
62        record.access_count as f32 / max_access as f32
63    } else {
64        0.0
65    };
66    let importance = record.importance;
67    let sim_bonus = similarity;
68
69    let composite =
70        src_rel * 0.3 + recency * 0.2 + usage * 0.2 + importance * 0.2 + sim_bonus * 0.1;
71
72    ConflictEvidence {
73        source_reliability: src_rel,
74        recency_score: recency,
75        usage_score: usage,
76        importance_score: importance,
77        similarity_bonus: sim_bonus,
78        composite_weight: composite,
79    }
80}
81
82/// Detect potential conflicts (near-duplicate memories) for an agent.
83/// Uses the vector index to find memories with cosine similarity above threshold.
84pub async fn detect_conflicts(
85    engine: &MnemoEngine,
86    agent_id: &str,
87    threshold: f32,
88) -> Result<ConflictDetectionResult> {
89    let filter = MemoryFilter {
90        agent_id: Some(agent_id.to_string()),
91        include_deleted: false,
92        ..Default::default()
93    };
94    let memories = engine.storage.list_memories(&filter, 1000, 0).await?;
95
96    let mut conflicts = Vec::new();
97    let mut checked: std::collections::HashSet<(Uuid, Uuid)> = std::collections::HashSet::new();
98
99    for record in &memories {
100        if record.quarantined {
101            continue;
102        }
103        let embedding = match &record.embedding {
104            Some(e) => e,
105            None => continue,
106        };
107
108        // Search for similar memories using the vector index
109        let results = engine.index.search(embedding, 20)?;
110
111        for (candidate_id, distance) in results {
112            if candidate_id == record.id {
113                continue;
114            }
115            let similarity = 1.0 - distance;
116            if similarity < threshold {
117                continue;
118            }
119
120            // Avoid duplicate pairs
121            let pair = if record.id < candidate_id {
122                (record.id, candidate_id)
123            } else {
124                (candidate_id, record.id)
125            };
126            if !checked.insert(pair) {
127                continue;
128            }
129
130            // Verify the candidate belongs to the same agent
131            if let Some(candidate) = engine.storage.get_memory(candidate_id).await?
132                && !(candidate.agent_id != agent_id
133                    || candidate.is_deleted()
134                    || candidate.quarantined)
135                && candidate.content != record.content
136            {
137                conflicts.push(ConflictPair {
138                    memory_a: record.id,
139                    memory_b: candidate_id,
140                    similarity,
141                    reason: format!(
142                        "High semantic similarity ({:.3}) between different content",
143                        similarity
144                    ),
145                });
146            }
147        }
148    }
149
150    Ok(ConflictDetectionResult { conflicts })
151}
152
153/// Resolve a detected conflict using the specified strategy.
154pub async fn resolve_conflict(
155    engine: &MnemoEngine,
156    conflict: &ConflictPair,
157    strategy: ResolutionStrategy,
158) -> Result<()> {
159    let mem_a = engine
160        .storage
161        .get_memory(conflict.memory_a)
162        .await?
163        .ok_or_else(|| {
164            crate::error::Error::NotFound(format!("memory {} not found", conflict.memory_a))
165        })?;
166    let mem_b = engine
167        .storage
168        .get_memory(conflict.memory_b)
169        .await?
170        .ok_or_else(|| {
171            crate::error::Error::NotFound(format!("memory {} not found", conflict.memory_b))
172        })?;
173
174    match strategy {
175        ResolutionStrategy::KeepNewest => {
176            // Soft-delete the older memory
177            if mem_a.created_at >= mem_b.created_at {
178                engine.storage.soft_delete_memory(mem_b.id).await?;
179            } else {
180                engine.storage.soft_delete_memory(mem_a.id).await?;
181            }
182        }
183        ResolutionStrategy::KeepHighestImportance => {
184            if mem_a.importance >= mem_b.importance {
185                engine.storage.soft_delete_memory(mem_b.id).await?;
186            } else {
187                engine.storage.soft_delete_memory(mem_a.id).await?;
188            }
189        }
190        ResolutionStrategy::MergeIntoSemantic => {
191            // Create new semantic memory combining both, soft-delete originals
192            let combined_content = format!("{} | {}", mem_a.content, mem_b.content);
193            let avg_importance = (mem_a.importance + mem_b.importance) / 2.0;
194            let mut all_tags: Vec<String> = mem_a.tags.clone();
195            for t in &mem_b.tags {
196                if !all_tags.contains(t) {
197                    all_tags.push(t.clone());
198                }
199            }
200
201            let now = chrono::Utc::now().to_rfc3339();
202            let embedding = engine.embedding.embed(&combined_content).await?;
203            let content_hash =
204                crate::hash::compute_content_hash(&combined_content, &mem_a.agent_id, &now);
205
206            let prev_hash_raw = engine
207                .storage
208                .get_latest_memory_hash(&mem_a.agent_id, None)
209                .await
210                .ok()
211                .flatten();
212            let prev_hash = Some(crate::hash::compute_chain_hash(
213                &content_hash,
214                prev_hash_raw.as_deref(),
215            ));
216
217            let new_record = MemoryRecord {
218                id: Uuid::now_v7(),
219                agent_id: mem_a.agent_id.clone(),
220                content: combined_content,
221                memory_type: MemoryType::Semantic,
222                scope: mem_a.scope,
223                importance: avg_importance,
224                tags: all_tags,
225                metadata: serde_json::json!({
226                    "merged_from": [mem_a.id.to_string(), mem_b.id.to_string()]
227                }),
228                embedding: Some(embedding.clone()),
229                content_hash,
230                prev_hash,
231                source_type: SourceType::Consolidation,
232                source_id: None,
233                consolidation_state: ConsolidationState::Active,
234                access_count: 0,
235                org_id: mem_a.org_id.clone(),
236                thread_id: None,
237                created_at: now.clone(),
238                updated_at: now,
239                last_accessed_at: None,
240                expires_at: None,
241                deleted_at: None,
242                decay_rate: None,
243                created_by: Some("conflict_resolution".to_string()),
244                version: 1,
245                prev_version_id: None,
246                quarantined: false,
247                quarantine_reason: None,
248                decay_function: None,
249            };
250
251            engine.storage.insert_memory(&new_record).await?;
252            engine.index.add(new_record.id, &embedding)?;
253            if let Some(ref ft) = engine.full_text {
254                ft.add(new_record.id, &new_record.content)?;
255                ft.commit()?;
256            }
257
258            engine.storage.soft_delete_memory(mem_a.id).await?;
259            engine.storage.soft_delete_memory(mem_b.id).await?;
260        }
261        ResolutionStrategy::Manual => {
262            // No-op: just flag for manual review
263        }
264        ResolutionStrategy::EvidenceWeighted => {
265            let max_access = mem_a.access_count.max(mem_b.access_count);
266            let evidence_a = compute_evidence(&mem_a, max_access, conflict.similarity);
267            let evidence_b = compute_evidence(&mem_b, max_access, conflict.similarity);
268
269            let (winner, loser, winner_evidence, loser_evidence) =
270                if evidence_a.composite_weight >= evidence_b.composite_weight {
271                    (&mem_a, &mem_b, &evidence_a, &evidence_b)
272                } else {
273                    (&mem_b, &mem_a, &evidence_b, &evidence_a)
274                };
275
276            // Soft-delete the loser
277            engine.storage.soft_delete_memory(loser.id).await?;
278
279            // Store resolution metadata in winner's metadata
280            let mut winner_record = winner.clone();
281            let mut meta = winner_record
282                .metadata
283                .as_object()
284                .cloned()
285                .unwrap_or_default();
286            meta.insert(
287                "conflict_resolution".to_string(),
288                serde_json::json!({
289                    "strategy": "evidence_weighted",
290                    "defeated_id": loser.id.to_string(),
291                    "winner_score": winner_evidence.composite_weight,
292                    "loser_score": loser_evidence.composite_weight,
293                    "winner_evidence": {
294                        "source_reliability": winner_evidence.source_reliability,
295                        "recency_score": winner_evidence.recency_score,
296                        "usage_score": winner_evidence.usage_score,
297                        "importance_score": winner_evidence.importance_score,
298                    },
299                }),
300            );
301            winner_record.metadata = serde_json::Value::Object(meta);
302            engine.storage.update_memory(&winner_record).await?;
303        }
304    }
305
306    Ok(())
307}