1const MAX_IDS_PER_QUERY: usize = 499;
16
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use zeph_config::ImplicitConflictConfig;
20use zeph_db::DbTransaction;
21
22use crate::error::MemoryError;
23use crate::graph::activation::ActivatedFact;
24
25#[derive(Debug, Clone)]
27pub struct ConflictCandidate {
28 pub edge_a_id: i64,
30 pub edge_b_id: i64,
32 pub similarity: f64,
34 pub method: String,
36}
37
38pub struct ImplicitConflictDetector {
55 config: ImplicitConflictConfig,
56}
57
58impl ImplicitConflictDetector {
59 #[must_use]
61 pub fn new(config: ImplicitConflictConfig) -> Self {
62 Self { config }
63 }
64
65 #[must_use]
82 pub fn detect_candidates(
83 &self,
84 new_edge_id: i64,
85 new_predicate: &str,
86 existing: &[(i64, &str)],
87 is_cardinality_n: bool,
88 ) -> Vec<ConflictCandidate> {
89 let _span = tracing::info_span!(
90 "memory.graph.implicit_conflict.detect",
91 predicate = new_predicate,
92 )
93 .entered();
94
95 if !self.config.enabled || is_cardinality_n || existing.is_empty() {
96 return Vec::new();
97 }
98
99 let threshold = self.config.conflict_similarity_threshold;
100 let mut candidates = Vec::new();
101
102 for &(edge_id, predicate) in existing {
103 if predicate == new_predicate {
104 continue;
106 }
107 let sim = Self::normalized_levenshtein(new_predicate, predicate);
108 if sim >= threshold {
109 candidates.push(ConflictCandidate {
110 edge_a_id: new_edge_id,
111 edge_b_id: edge_id,
112 similarity: sim,
113 method: "levenshtein".to_owned(),
114 });
115 }
116 }
117
118 candidates
119 }
120
121 pub async fn stage_candidates(
130 &self,
131 candidates: &[ConflictCandidate],
132 tx: &mut DbTransaction<'_>,
133 ttl_days: u32,
134 ) -> Result<(), MemoryError> {
135 if candidates.is_empty() {
136 return Ok(());
137 }
138
139 let _span = tracing::info_span!(
140 "memory.graph.implicit_conflict.stage",
141 count = candidates.len(),
142 )
143 .entered();
144
145 #[allow(clippy::cast_possible_wrap)]
146 let now = SystemTime::now()
147 .duration_since(UNIX_EPOCH)
148 .unwrap_or_default()
149 .as_secs() as i64;
150 let ttl_secs = i64::from(ttl_days) * 86_400;
151 let expires_at = now + ttl_secs;
152
153 for c in candidates {
154 sqlx::query(
155 "INSERT INTO implicit_conflict_candidates
156 (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at)
157 VALUES (?, ?, ?, ?, 'pending', ?, ?)",
158 )
159 .bind(c.edge_a_id)
160 .bind(c.edge_b_id)
161 .bind(c.similarity)
162 .bind(&c.method)
163 .bind(now)
164 .bind(expires_at)
165 .execute(&mut **tx)
166 .await
167 .map_err(MemoryError::from)?;
168 }
169
170 Ok(())
171 }
172
173 #[must_use]
178 pub fn normalized_levenshtein(a: &str, b: &str) -> f64 {
179 if a == b {
180 return 1.0;
181 }
182 let len_a = a.chars().count();
183 let len_b = b.chars().count();
184 if len_a == 0 && len_b == 0 {
185 return 1.0;
186 }
187 if len_a == 0 || len_b == 0 {
188 return 0.0;
189 }
190 let dist = levenshtein_distance(a, b);
191 let max_len = len_a.max(len_b);
192 #[allow(clippy::cast_precision_loss)]
193 let result = 1.0 - (dist as f64 / max_len as f64);
194 result
195 }
196
197 #[must_use]
199 pub fn is_enabled(&self) -> bool {
200 self.config.enabled
201 }
202
203 #[must_use]
205 pub fn candidate_ttl_days(&self) -> u32 {
206 self.config.candidate_ttl_days
207 }
208}
209
210pub async fn annotate_conflicts(
219 facts: &mut [ActivatedFact],
220 tx: &mut DbTransaction<'_>,
221) -> Result<(), MemoryError> {
222 if facts.is_empty() {
223 return Ok(());
224 }
225
226 let _span = tracing::info_span!(
227 "memory.graph.implicit_conflict.annotate",
228 facts = facts.len(),
229 )
230 .entered();
231
232 let edge_ids: Vec<i64> = facts.iter().map(|f| f.edge.id).collect();
233
234 let mut edge_to_candidate: std::collections::HashMap<i64, i64> =
235 std::collections::HashMap::new();
236
237 for chunk in edge_ids.chunks(MAX_IDS_PER_QUERY) {
238 let placeholders: String = chunk.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
239 let query_str = format!(
240 "SELECT id, edge_a_id, edge_b_id
241 FROM implicit_conflict_candidates
242 WHERE status = 'pending'
243 AND (edge_a_id IN ({placeholders}) OR edge_b_id IN ({placeholders}))",
244 );
245
246 let mut q = sqlx::query(&query_str);
247 for id in chunk {
248 q = q.bind(id);
249 }
250 for id in chunk {
252 q = q.bind(id);
253 }
254
255 let rows = q.fetch_all(&mut **tx).await.map_err(MemoryError::from)?;
256
257 for row in rows {
258 use sqlx::Row as _;
259 let candidate_id: i64 = row.try_get("id").map_err(MemoryError::from)?;
260 let ea: i64 = row.try_get("edge_a_id").map_err(MemoryError::from)?;
261 let eb: i64 = row.try_get("edge_b_id").map_err(MemoryError::from)?;
262 edge_to_candidate.entry(ea).or_insert(candidate_id);
263 edge_to_candidate.entry(eb).or_insert(candidate_id);
264 }
265 }
266
267 for fact in facts.iter_mut() {
268 if let Some(&cid) = edge_to_candidate.get(&fact.edge.id) {
269 fact.is_implicit_conflict = true;
270 fact.conflict_candidate_id = Some(cid);
271 }
272 }
273
274 Ok(())
275}
276
277fn levenshtein_distance(a: &str, b: &str) -> usize {
279 let a_chars: Vec<char> = a.chars().collect();
280 let b_chars: Vec<char> = b.chars().collect();
281 let m = a_chars.len();
282 let n = b_chars.len();
283
284 let mut prev: Vec<usize> = (0..=n).collect();
285 let mut curr = vec![0usize; n + 1];
286
287 for i in 1..=m {
288 curr[0] = i;
289 for j in 1..=n {
290 let cost = usize::from(a_chars[i - 1] != b_chars[j - 1]);
291 curr[j] = (prev[j] + 1).min(curr[j - 1] + 1).min(prev[j - 1] + cost);
292 }
293 std::mem::swap(&mut prev, &mut curr);
294 }
295
296 prev[n]
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use zeph_config::ImplicitConflictConfig;
303
304 fn detector(enabled: bool) -> ImplicitConflictDetector {
305 ImplicitConflictDetector::new(ImplicitConflictConfig {
306 enabled,
307 conflict_similarity_threshold: 0.80,
308 ..Default::default()
309 })
310 }
311
312 #[test]
313 fn normalized_levenshtein_identical() {
314 assert!(
315 (ImplicitConflictDetector::normalized_levenshtein("uses", "uses") - 1.0).abs()
316 < f64::EPSILON
317 );
318 }
319
320 #[test]
321 fn normalized_levenshtein_empty_both() {
322 assert!(
323 (ImplicitConflictDetector::normalized_levenshtein("", "") - 1.0).abs() < f64::EPSILON
324 );
325 }
326
327 #[test]
328 fn normalized_levenshtein_empty_one() {
329 assert!(
330 (ImplicitConflictDetector::normalized_levenshtein("", "abc") - 0.0).abs()
331 < f64::EPSILON
332 );
333 assert!(
334 (ImplicitConflictDetector::normalized_levenshtein("abc", "") - 0.0).abs()
335 < f64::EPSILON
336 );
337 }
338
339 #[test]
340 fn normalized_levenshtein_completely_different() {
341 let sim = ImplicitConflictDetector::normalized_levenshtein("uses", "xyz_unrelated_value");
342 assert!(sim < 0.5, "expected low similarity, got {sim}");
343 }
344
345 #[test]
346 fn detect_candidates_above_threshold_returns_candidate() {
347 let d = detector(true);
348 let candidates = d.detect_candidates(42, "employ", &[(7, "employs")], false);
350 assert_eq!(candidates.len(), 1, "expected one candidate");
351 assert_eq!(candidates[0].edge_a_id, 42);
352 assert_eq!(candidates[0].edge_b_id, 7);
353 assert!(candidates[0].similarity >= 0.80);
354 }
355
356 #[test]
357 fn detect_candidates_below_threshold_returns_empty() {
358 let d = detector(true);
359 let candidates = d.detect_candidates(1, "uses", &[(2, "xyz_unrelated")], false);
360 assert!(
361 candidates.is_empty(),
362 "expected no candidates below threshold"
363 );
364 }
365
366 #[test]
367 fn detect_candidates_identical_predicate_skipped() {
368 let d = detector(true);
369 let candidates = d.detect_candidates(1, "uses", &[(2, "uses")], false);
371 assert!(
372 candidates.is_empty(),
373 "identical predicates must not create candidates"
374 );
375 }
376
377 #[test]
378 fn detect_candidates_disabled_returns_empty() {
379 let d = detector(false);
380 let candidates = d.detect_candidates(1, "employ", &[(2, "employs")], false);
382 assert!(candidates.is_empty(), "disabled detector must return empty");
383 }
384
385 #[test]
386 fn detect_candidates_cardinality_n_skipped() {
387 let d = detector(true);
388 let candidates = d.detect_candidates(1, "employ", &[(2, "employs")], true);
389 assert!(
390 candidates.is_empty(),
391 "cardinality-n predicate must be skipped"
392 );
393 }
394
395 async fn setup_test_db() -> crate::store::SqliteStore {
398 crate::store::SqliteStore::new(":memory:").await.unwrap()
399 }
400
401 fn stub_fact(edge_id: i64) -> ActivatedFact {
402 use crate::graph::types::{Edge, EdgeType};
403 ActivatedFact {
404 edge: Edge {
405 id: edge_id,
406 source_entity_id: 1,
407 target_entity_id: 2,
408 relation: "test".to_owned(),
409 canonical_relation: "test".to_owned(),
410 fact: "test fact".to_owned(),
411 confidence: 1.0,
412 valid_from: "2026-01-01".to_owned(),
413 valid_to: None,
414 created_at: "2026-01-01".to_owned(),
415 expired_at: None,
416 source_message_id: None,
417 qdrant_point_id: None,
418 edge_type: EdgeType::Semantic,
419 retrieval_count: 0,
420 last_retrieved_at: None,
421 superseded_by: None,
422 supersedes: None,
423 weight: 1.0,
424 },
425 activation_score: 1.0,
426 is_implicit_conflict: false,
427 conflict_candidate_id: None,
428 }
429 }
430
431 #[tokio::test]
432 async fn annotate_conflicts_marks_flagged_edges() {
433 let db = setup_test_db().await;
434 let pool = db.pool();
435
436 sqlx::query(
439 "PRAGMA foreign_keys = OFF;
440 INSERT INTO implicit_conflict_candidates
441 (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at)
442 VALUES (1, 2, 0.90, 'levenshtein', 'pending', 1000000, 9999999)",
443 )
444 .execute(pool)
445 .await
446 .unwrap();
447
448 let mut facts = vec![stub_fact(1), stub_fact(3)];
449
450 let mut tx = zeph_db::begin(pool).await.unwrap();
451 annotate_conflicts(&mut facts, &mut tx).await.unwrap();
452 tx.commit().await.unwrap();
453
454 assert!(facts[0].is_implicit_conflict, "edge 1 must be flagged");
455 assert!(facts[0].conflict_candidate_id.is_some());
456 assert!(!facts[1].is_implicit_conflict, "edge 3 must not be flagged");
457 assert!(facts[1].conflict_candidate_id.is_none());
458 }
459
460 #[tokio::test]
461 async fn annotate_conflicts_empty_candidates_no_annotation() {
462 let db = setup_test_db().await;
463 let pool = db.pool();
464
465 let mut facts = vec![stub_fact(10), stub_fact(20)];
466
467 let mut tx = zeph_db::begin(pool).await.unwrap();
468 annotate_conflicts(&mut facts, &mut tx).await.unwrap();
469 tx.commit().await.unwrap();
470
471 assert!(
472 !facts[0].is_implicit_conflict,
473 "no candidates → no annotation"
474 );
475 assert!(facts[1].conflict_candidate_id.is_none());
476 }
477
478 #[tokio::test]
479 async fn annotate_conflicts_edge_b_side_also_flagged() {
480 let db = setup_test_db().await;
481 let pool = db.pool();
482
483 sqlx::query(
485 "PRAGMA foreign_keys = OFF;
486 INSERT INTO implicit_conflict_candidates
487 (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at)
488 VALUES (5, 7, 0.85, 'levenshtein', 'pending', 1000000, 9999999)",
489 )
490 .execute(pool)
491 .await
492 .unwrap();
493
494 let mut facts = vec![stub_fact(7)];
495
496 let mut tx = zeph_db::begin(pool).await.unwrap();
497 annotate_conflicts(&mut facts, &mut tx).await.unwrap();
498 tx.commit().await.unwrap();
499
500 assert!(
501 facts[0].is_implicit_conflict,
502 "edge on edge_b side must also be flagged"
503 );
504 }
505}