1const MAX_IDS_PER_QUERY: usize = 499;
16
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use zeph_config::ImplicitConflictConfig;
20use zeph_db::DbTransaction;
21
22use crate::error::MemoryError;
23use crate::graph::activation::ActivatedFact;
24
25#[derive(Debug, Clone)]
27pub struct ConflictCandidate {
28 pub edge_a_id: i64,
30 pub edge_b_id: i64,
32 pub similarity: f64,
34 pub method: String,
36}
37
38pub struct ImplicitConflictDetector {
55 config: ImplicitConflictConfig,
56}
57
58impl ImplicitConflictDetector {
59 #[must_use]
61 pub fn new(config: ImplicitConflictConfig) -> Self {
62 Self { config }
63 }
64
65 #[must_use]
82 pub fn detect_candidates(
83 &self,
84 new_edge_id: i64,
85 new_predicate: &str,
86 existing: &[(i64, &str)],
87 is_cardinality_n: bool,
88 ) -> Vec<ConflictCandidate> {
89 let _span = tracing::info_span!(
90 "memory.graph.implicit_conflict.detect",
91 predicate = new_predicate,
92 )
93 .entered();
94
95 if !self.config.enabled || is_cardinality_n || existing.is_empty() {
96 return Vec::new();
97 }
98
99 let threshold = self.config.conflict_similarity_threshold;
100 let mut candidates = Vec::new();
101
102 for &(edge_id, predicate) in existing {
103 if predicate == new_predicate {
104 continue;
106 }
107 let sim = Self::normalized_levenshtein(new_predicate, predicate);
108 if sim >= threshold {
109 candidates.push(ConflictCandidate {
110 edge_a_id: new_edge_id,
111 edge_b_id: edge_id,
112 similarity: sim,
113 method: "levenshtein".to_owned(),
114 });
115 }
116 }
117
118 candidates
119 }
120
121 #[tracing::instrument(skip_all, name = "memory.graph.implicit_conflict.stage", fields(count = candidates.len()))]
130 pub async fn stage_candidates(
131 &self,
132 candidates: &[ConflictCandidate],
133 tx: &mut DbTransaction<'_>,
134 ttl_days: u32,
135 ) -> Result<(), MemoryError> {
136 if candidates.is_empty() {
137 return Ok(());
138 }
139
140 #[allow(clippy::cast_possible_wrap)]
141 let now = SystemTime::now()
142 .duration_since(UNIX_EPOCH)
143 .unwrap_or_default()
144 .as_secs() as i64;
145 let ttl_secs = i64::from(ttl_days) * 86_400;
146 let expires_at = now + ttl_secs;
147
148 for c in candidates {
149 sqlx::query(
150 "INSERT INTO implicit_conflict_candidates
151 (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at)
152 VALUES (?, ?, ?, ?, 'pending', ?, ?)",
153 )
154 .bind(c.edge_a_id)
155 .bind(c.edge_b_id)
156 .bind(c.similarity)
157 .bind(&c.method)
158 .bind(now)
159 .bind(expires_at)
160 .execute(&mut **tx)
161 .await
162 .map_err(MemoryError::from)?;
163 }
164
165 Ok(())
166 }
167
168 #[must_use]
173 pub fn normalized_levenshtein(a: &str, b: &str) -> f64 {
174 if a == b {
175 return 1.0;
176 }
177 let len_a = a.chars().count();
178 let len_b = b.chars().count();
179 if len_a == 0 && len_b == 0 {
180 return 1.0;
181 }
182 if len_a == 0 || len_b == 0 {
183 return 0.0;
184 }
185 let dist = levenshtein_distance(a, b);
186 let max_len = len_a.max(len_b);
187 #[allow(clippy::cast_precision_loss)]
188 let result = 1.0 - (dist as f64 / max_len as f64);
189 result
190 }
191
192 #[must_use]
194 pub fn is_enabled(&self) -> bool {
195 self.config.enabled
196 }
197
198 #[must_use]
200 pub fn candidate_ttl_days(&self) -> u32 {
201 self.config.candidate_ttl_days
202 }
203}
204
205#[tracing::instrument(skip_all, name = "memory.graph.implicit_conflict.annotate", fields(facts = facts.len()))]
214pub async fn annotate_conflicts(
215 facts: &mut [ActivatedFact],
216 tx: &mut DbTransaction<'_>,
217) -> Result<(), MemoryError> {
218 if facts.is_empty() {
219 return Ok(());
220 }
221
222 let edge_ids: Vec<i64> = facts.iter().map(|f| f.edge.id).collect();
223
224 let mut edge_to_candidate: std::collections::HashMap<i64, i64> =
225 std::collections::HashMap::new();
226
227 for chunk in edge_ids.chunks(MAX_IDS_PER_QUERY) {
228 let placeholders: String = chunk.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
229 let query_str = format!(
230 "SELECT id, edge_a_id, edge_b_id
231 FROM implicit_conflict_candidates
232 WHERE status = 'pending'
233 AND (edge_a_id IN ({placeholders}) OR edge_b_id IN ({placeholders}))",
234 );
235
236 let mut q = sqlx::query(&query_str);
237 for id in chunk {
238 q = q.bind(id);
239 }
240 for id in chunk {
242 q = q.bind(id);
243 }
244
245 let rows = q.fetch_all(&mut **tx).await.map_err(MemoryError::from)?;
246
247 for row in rows {
248 use sqlx::Row as _;
249 let candidate_id: i64 = row.try_get("id").map_err(MemoryError::from)?;
250 let ea: i64 = row.try_get("edge_a_id").map_err(MemoryError::from)?;
251 let eb: i64 = row.try_get("edge_b_id").map_err(MemoryError::from)?;
252 edge_to_candidate.entry(ea).or_insert(candidate_id);
253 edge_to_candidate.entry(eb).or_insert(candidate_id);
254 }
255 }
256
257 for fact in facts.iter_mut() {
258 if let Some(&cid) = edge_to_candidate.get(&fact.edge.id) {
259 fact.is_implicit_conflict = true;
260 fact.conflict_candidate_id = Some(cid);
261 }
262 }
263
264 Ok(())
265}
266
267fn levenshtein_distance(a: &str, b: &str) -> usize {
269 let a_chars: Vec<char> = a.chars().collect();
270 let b_chars: Vec<char> = b.chars().collect();
271 let m = a_chars.len();
272 let n = b_chars.len();
273
274 let mut prev: Vec<usize> = (0..=n).collect();
275 let mut curr = vec![0usize; n + 1];
276
277 for i in 1..=m {
278 curr[0] = i;
279 for j in 1..=n {
280 let cost = usize::from(a_chars[i - 1] != b_chars[j - 1]);
281 curr[j] = (prev[j] + 1).min(curr[j - 1] + 1).min(prev[j - 1] + cost);
282 }
283 std::mem::swap(&mut prev, &mut curr);
284 }
285
286 prev[n]
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use zeph_config::ImplicitConflictConfig;
293
294 fn detector(enabled: bool) -> ImplicitConflictDetector {
295 ImplicitConflictDetector::new(ImplicitConflictConfig {
296 enabled,
297 conflict_similarity_threshold: 0.80,
298 ..Default::default()
299 })
300 }
301
302 #[test]
303 fn normalized_levenshtein_identical() {
304 assert!(
305 (ImplicitConflictDetector::normalized_levenshtein("uses", "uses") - 1.0).abs()
306 < f64::EPSILON
307 );
308 }
309
310 #[test]
311 fn normalized_levenshtein_empty_both() {
312 assert!(
313 (ImplicitConflictDetector::normalized_levenshtein("", "") - 1.0).abs() < f64::EPSILON
314 );
315 }
316
317 #[test]
318 fn normalized_levenshtein_empty_one() {
319 assert!(
320 (ImplicitConflictDetector::normalized_levenshtein("", "abc") - 0.0).abs()
321 < f64::EPSILON
322 );
323 assert!(
324 (ImplicitConflictDetector::normalized_levenshtein("abc", "") - 0.0).abs()
325 < f64::EPSILON
326 );
327 }
328
329 #[test]
330 fn normalized_levenshtein_completely_different() {
331 let sim = ImplicitConflictDetector::normalized_levenshtein("uses", "xyz_unrelated_value");
332 assert!(sim < 0.5, "expected low similarity, got {sim}");
333 }
334
335 #[test]
336 fn detect_candidates_above_threshold_returns_candidate() {
337 let d = detector(true);
338 let candidates = d.detect_candidates(42, "employ", &[(7, "employs")], false);
340 assert_eq!(candidates.len(), 1, "expected one candidate");
341 assert_eq!(candidates[0].edge_a_id, 42);
342 assert_eq!(candidates[0].edge_b_id, 7);
343 assert!(candidates[0].similarity >= 0.80);
344 }
345
346 #[test]
347 fn detect_candidates_below_threshold_returns_empty() {
348 let d = detector(true);
349 let candidates = d.detect_candidates(1, "uses", &[(2, "xyz_unrelated")], false);
350 assert!(
351 candidates.is_empty(),
352 "expected no candidates below threshold"
353 );
354 }
355
356 #[test]
357 fn detect_candidates_identical_predicate_skipped() {
358 let d = detector(true);
359 let candidates = d.detect_candidates(1, "uses", &[(2, "uses")], false);
361 assert!(
362 candidates.is_empty(),
363 "identical predicates must not create candidates"
364 );
365 }
366
367 #[test]
368 fn detect_candidates_disabled_returns_empty() {
369 let d = detector(false);
370 let candidates = d.detect_candidates(1, "employ", &[(2, "employs")], false);
372 assert!(candidates.is_empty(), "disabled detector must return empty");
373 }
374
375 #[test]
376 fn detect_candidates_cardinality_n_skipped() {
377 let d = detector(true);
378 let candidates = d.detect_candidates(1, "employ", &[(2, "employs")], true);
379 assert!(
380 candidates.is_empty(),
381 "cardinality-n predicate must be skipped"
382 );
383 }
384
385 async fn setup_test_db() -> crate::store::SqliteStore {
388 crate::store::SqliteStore::new(":memory:").await.unwrap()
389 }
390
391 fn stub_fact(edge_id: i64) -> ActivatedFact {
392 use crate::graph::types::{Edge, EdgeType};
393 ActivatedFact {
394 edge: Edge {
395 id: edge_id,
396 source_entity_id: 1,
397 target_entity_id: 2,
398 relation: "test".to_owned(),
399 canonical_relation: "test".to_owned(),
400 fact: "test fact".to_owned(),
401 confidence: 1.0,
402 valid_from: "2026-01-01".to_owned(),
403 valid_to: None,
404 created_at: "2026-01-01".to_owned(),
405 expired_at: None,
406 source_message_id: None,
407 qdrant_point_id: None,
408 edge_type: EdgeType::Semantic,
409 retrieval_count: 0,
410 last_retrieved_at: None,
411 superseded_by: None,
412 supersedes: None,
413 weight: 1.0,
414 confidence_fast: 1.0,
415 confidence_slow: 1.0,
416 turn_index: None,
417 },
418 activation_score: 1.0,
419 is_implicit_conflict: false,
420 conflict_candidate_id: None,
421 }
422 }
423
424 #[tokio::test]
425 async fn annotate_conflicts_marks_flagged_edges() {
426 let db = setup_test_db().await;
427 let pool = db.pool();
428
429 sqlx::query(
432 "PRAGMA foreign_keys = OFF;
433 INSERT INTO implicit_conflict_candidates
434 (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at)
435 VALUES (1, 2, 0.90, 'levenshtein', 'pending', 1000000, 9999999)",
436 )
437 .execute(pool)
438 .await
439 .unwrap();
440
441 let mut facts = vec![stub_fact(1), stub_fact(3)];
442
443 let mut tx = zeph_db::begin(pool).await.unwrap();
444 annotate_conflicts(&mut facts, &mut tx).await.unwrap();
445 tx.commit().await.unwrap();
446
447 assert!(facts[0].is_implicit_conflict, "edge 1 must be flagged");
448 assert!(facts[0].conflict_candidate_id.is_some());
449 assert!(!facts[1].is_implicit_conflict, "edge 3 must not be flagged");
450 assert!(facts[1].conflict_candidate_id.is_none());
451 }
452
453 #[tokio::test]
454 async fn annotate_conflicts_empty_candidates_no_annotation() {
455 let db = setup_test_db().await;
456 let pool = db.pool();
457
458 let mut facts = vec![stub_fact(10), stub_fact(20)];
459
460 let mut tx = zeph_db::begin(pool).await.unwrap();
461 annotate_conflicts(&mut facts, &mut tx).await.unwrap();
462 tx.commit().await.unwrap();
463
464 assert!(
465 !facts[0].is_implicit_conflict,
466 "no candidates → no annotation"
467 );
468 assert!(facts[1].conflict_candidate_id.is_none());
469 }
470
471 #[tokio::test]
472 async fn annotate_conflicts_edge_b_side_also_flagged() {
473 let db = setup_test_db().await;
474 let pool = db.pool();
475
476 sqlx::query(
478 "PRAGMA foreign_keys = OFF;
479 INSERT INTO implicit_conflict_candidates
480 (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at)
481 VALUES (5, 7, 0.85, 'levenshtein', 'pending', 1000000, 9999999)",
482 )
483 .execute(pool)
484 .await
485 .unwrap();
486
487 let mut facts = vec![stub_fact(7)];
488
489 let mut tx = zeph_db::begin(pool).await.unwrap();
490 annotate_conflicts(&mut facts, &mut tx).await.unwrap();
491 tx.commit().await.unwrap();
492
493 assert!(
494 facts[0].is_implicit_conflict,
495 "edge on edge_b side must also be flagged"
496 );
497 }
498}