1use std::time::Duration;
31
32use zeph_llm::any::AnyProvider;
33use zeph_llm::provider::{LlmProvider as _, Message, Role};
34
35use crate::error::MemoryError;
36use crate::graph::types::Edge;
37
38#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum ConflictStrategy {
45 Recency,
47 Confidence,
49 Llm,
51}
52
53pub const SUPERSEDE_DEPTH_CAP: usize = 64;
56
57pub struct ConflictResult {
59 pub winner: Edge,
61 pub alternatives: Vec<Edge>,
63}
64
65pub struct ConflictResolver {
67 strategy: ConflictStrategy,
68 timeout: Duration,
69 llm_budget: std::sync::atomic::AtomicI32,
71 retain_alternatives: bool,
72 llm_provider: Option<AnyProvider>,
74}
75
76impl ConflictResolver {
77 #[must_use]
84 pub fn new(
85 strategy: ConflictStrategy,
86 timeout_ms: u64,
87 llm_budget_per_turn: usize,
88 retain_alternatives: bool,
89 ) -> Self {
90 let budget = i32::try_from(llm_budget_per_turn).unwrap_or(i32::MAX);
91 Self {
92 strategy,
93 timeout: Duration::from_millis(timeout_ms),
94 llm_budget: std::sync::atomic::AtomicI32::new(budget),
95 retain_alternatives,
96 llm_provider: None,
97 }
98 }
99
100 #[must_use]
102 pub fn with_llm_provider(mut self, provider: AnyProvider) -> Self {
103 self.llm_provider = Some(provider);
104 self
105 }
106
107 pub fn reset_turn_budget(&self, budget: usize) {
109 let budget_i32 = i32::try_from(budget).unwrap_or(i32::MAX);
110 self.llm_budget
111 .store(budget_i32, std::sync::atomic::Ordering::Relaxed);
112 }
113
114 pub async fn resolve(
122 &self,
123 mut candidates: Vec<Edge>,
124 metrics: &ApexMetrics,
125 ) -> Result<ConflictResult, MemoryError> {
126 tracing::debug!(target: "memory.graph.apex.conflict_resolve", candidates = candidates.len());
127
128 if candidates.is_empty() {
129 return Err(MemoryError::InvalidInput(
130 "conflict resolver called with empty candidate list".into(),
131 ));
132 }
133 if candidates.len() == 1 {
134 return Ok(ConflictResult {
135 winner: candidates.remove(0),
136 alternatives: Vec::new(),
137 });
138 }
139
140 let effective_strategy = self.effective_strategy();
141 let winner_idx = match effective_strategy {
142 ConflictStrategy::Recency => recency_winner(&candidates),
143 ConflictStrategy::Confidence => confidence_winner(&candidates),
144 ConflictStrategy::Llm => self.llm_winner(&candidates, metrics).await,
145 };
146
147 metrics
148 .conflicts_total
149 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
150
151 let winner = candidates.remove(winner_idx);
152 let alternatives = if self.retain_alternatives {
153 candidates
154 } else {
155 Vec::new()
156 };
157 Ok(ConflictResult {
158 winner,
159 alternatives,
160 })
161 }
162
163 fn effective_strategy(&self) -> ConflictStrategy {
165 if self.strategy == ConflictStrategy::Llm {
166 let remaining = self.llm_budget.load(std::sync::atomic::Ordering::Relaxed);
167 if remaining <= 0 {
168 return ConflictStrategy::Recency;
169 }
170 }
171 self.strategy.clone()
172 }
173
174 async fn llm_winner(&self, candidates: &[Edge], metrics: &ApexMetrics) -> usize {
175 tracing::debug!(target: "memory.graph.apex.conflict_llm", candidates = candidates.len());
176 self.llm_budget
177 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
178
179 let Some(provider) = &self.llm_provider else {
180 return recency_winner(candidates);
182 };
183
184 let prompt = build_conflict_prompt(candidates);
185 let messages = [
186 Message::from_legacy(
187 Role::System,
188 "You are a knowledge graph conflict resolver. Given a list of conflicting \
189 edge facts indexed from 0, respond with only the index of the most \
190 authoritative or recent fact. Output a single integer and nothing else.",
191 ),
192 Message::from_legacy(Role::User, prompt),
193 ];
194
195 let timeout = self.timeout;
196 match tokio::time::timeout(timeout, provider.chat(&messages)).await {
197 Ok(Ok(response)) => {
198 let trimmed = response.trim();
199 if let Ok(idx) = trimmed.parse::<usize>()
200 && idx < candidates.len()
201 {
202 return idx;
203 }
204 tracing::warn!(
205 raw = %trimmed,
206 "apex_mem: LLM conflict resolver returned unparseable index, falling back to recency"
207 );
208 recency_winner(candidates)
209 }
210 Ok(Err(e)) => {
211 tracing::warn!(error = %e,
212 "apex_mem: LLM conflict resolver call failed, falling back to recency");
213 recency_winner(candidates)
214 }
215 Err(_) => {
216 metrics
217 .llm_timeouts_total
218 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
219 tracing::warn!(
220 "apex_mem: LLM conflict resolver timed out after {}ms, falling back to recency",
221 timeout.as_millis()
222 );
223 recency_winner(candidates)
224 }
225 }
226 }
227}
228
229fn build_conflict_prompt(candidates: &[Edge]) -> String {
230 let mut lines = String::from("Conflicting facts for the same predicate:\n");
231 for (i, edge) in candidates.iter().enumerate() {
232 use std::fmt::Write as _;
233 let _ = writeln!(lines, "{i}: [{}] {}", edge.valid_from, edge.fact);
234 }
235 lines.push_str(
236 "\nWhich index (0-based) is the most authoritative? Respond with only the integer.",
237 );
238 lines
239}
240
241fn recency_winner(candidates: &[Edge]) -> usize {
242 candidates
243 .iter()
244 .enumerate()
245 .max_by(|(_, a), (_, b)| a.valid_from.cmp(&b.valid_from))
246 .map_or(0, |(i, _)| i)
247}
248
249fn confidence_winner(candidates: &[Edge]) -> usize {
250 candidates
251 .iter()
252 .enumerate()
253 .max_by(|(_, a), (_, b)| {
254 a.confidence
255 .partial_cmp(&b.confidence)
256 .unwrap_or(std::cmp::Ordering::Equal)
257 })
258 .map_or(0, |(i, _)| i)
259}
260
261#[derive(Debug, Default)]
267pub struct ApexMetrics {
268 pub supersedes_total: std::sync::atomic::AtomicU64,
270 pub conflicts_total: std::sync::atomic::AtomicU64,
272 pub llm_timeouts_total: std::sync::atomic::AtomicU64,
274 pub unmapped_predicates_total: std::sync::atomic::AtomicU64,
276}
277
278impl ApexMetrics {
279 #[must_use]
281 pub fn snapshot(&self) -> Vec<(&'static str, u64)> {
282 vec![
283 (
284 "apex_mem_supersedes_total",
285 self.supersedes_total
286 .load(std::sync::atomic::Ordering::Relaxed),
287 ),
288 (
289 "apex_mem_conflicts_total",
290 self.conflicts_total
291 .load(std::sync::atomic::Ordering::Relaxed),
292 ),
293 (
294 "apex_mem_llm_timeouts_total",
295 self.llm_timeouts_total
296 .load(std::sync::atomic::Ordering::Relaxed),
297 ),
298 (
299 "apex_mem_unmapped_predicates_total",
300 self.unmapped_predicates_total
301 .load(std::sync::atomic::Ordering::Relaxed),
302 ),
303 ]
304 }
305}
306
307#[cfg(test)]
310mod tests {
311 use super::*;
312
313 fn make_edge(id: i64, valid_from: &str, confidence: f32) -> Edge {
314 Edge {
315 id,
316 source_entity_id: 1,
317 target_entity_id: 2,
318 relation: "works_at".into(),
319 canonical_relation: "works_at".into(),
320 fact: "fact".into(),
321 confidence,
322 valid_from: valid_from.to_string(),
323 valid_to: None,
324 created_at: valid_from.to_string(),
325 expired_at: None,
326 source_message_id: None,
327 qdrant_point_id: None,
328 edge_type: crate::graph::types::EdgeType::Semantic,
329 retrieval_count: 0,
330 last_retrieved_at: None,
331 superseded_by: None,
332 supersedes: None,
333 }
334 }
335
336 #[tokio::test]
337 async fn recency_strategy_picks_newest() {
338 let metrics = ApexMetrics::default();
339 let resolver = ConflictResolver::new(ConflictStrategy::Recency, 500, 3, false);
340 let candidates = vec![
341 make_edge(1, "2026-01-01 00:00:00", 0.9),
342 make_edge(2, "2026-06-01 00:00:00", 0.5),
343 make_edge(3, "2026-03-01 00:00:00", 0.7),
344 ];
345 let result = resolver.resolve(candidates, &metrics).await.unwrap();
346 assert_eq!(result.winner.id, 2, "newest valid_from wins");
347 }
348
349 #[tokio::test]
350 async fn confidence_strategy_picks_highest() {
351 let metrics = ApexMetrics::default();
352 let resolver = ConflictResolver::new(ConflictStrategy::Confidence, 500, 3, false);
353 let candidates = vec![
354 make_edge(1, "2026-01-01 00:00:00", 0.9),
355 make_edge(2, "2026-06-01 00:00:00", 0.5),
356 make_edge(3, "2026-03-01 00:00:00", 0.7),
357 ];
358 let result = resolver.resolve(candidates, &metrics).await.unwrap();
359 assert_eq!(result.winner.id, 1);
360 }
361
362 #[tokio::test]
363 async fn single_candidate_passes_through() {
364 let metrics = ApexMetrics::default();
365 let resolver = ConflictResolver::new(ConflictStrategy::Recency, 500, 3, false);
366 let candidates = vec![make_edge(42, "2026-01-01 00:00:00", 0.8)];
367 let result = resolver.resolve(candidates, &metrics).await.unwrap();
368 assert_eq!(result.winner.id, 42);
369 assert!(result.alternatives.is_empty());
370 }
371
372 #[tokio::test]
373 async fn retain_alternatives_when_enabled() {
374 let metrics = ApexMetrics::default();
375 let resolver = ConflictResolver::new(ConflictStrategy::Recency, 500, 3, true);
376 let candidates = vec![
377 make_edge(1, "2026-01-01 00:00:00", 0.9),
378 make_edge(2, "2026-06-01 00:00:00", 0.5),
379 ];
380 let result = resolver.resolve(candidates, &metrics).await.unwrap();
381 assert_eq!(result.winner.id, 2);
382 assert_eq!(result.alternatives.len(), 1);
383 assert_eq!(result.alternatives[0].id, 1);
384 }
385
386 #[tokio::test]
387 async fn budget_exhaustion_falls_back_to_recency() {
388 let metrics = ApexMetrics::default();
389 let resolver = ConflictResolver::new(ConflictStrategy::Llm, 500, 0, false);
390 let candidates = vec![
392 make_edge(1, "2026-01-01 00:00:00", 0.9),
393 make_edge(2, "2026-06-01 00:00:00", 0.5),
394 ];
395 let result = resolver.resolve(candidates, &metrics).await.unwrap();
396 assert_eq!(result.winner.id, 2);
397 }
398
399 #[test]
400 fn metrics_snapshot_has_four_entries() {
401 let m = ApexMetrics::default();
402 assert_eq!(m.snapshot().len(), 4);
403 }
404}