1use std::time::Duration;
31
32use zeph_llm::any::AnyProvider;
33use zeph_llm::provider::{LlmProvider as _, Message, Role};
34
35use crate::error::MemoryError;
36use crate::graph::types::Edge;
37
38#[derive(Debug, Clone, PartialEq, Eq)]
44#[non_exhaustive]
45pub enum ConflictStrategy {
46 Recency,
48 Confidence,
50 Llm,
52}
53
54pub const SUPERSEDE_DEPTH_CAP: usize = 64;
57
58pub struct ConflictResult {
60 pub winner: Edge,
62 pub alternatives: Vec<Edge>,
64}
65
66pub struct ConflictResolver {
68 strategy: ConflictStrategy,
69 timeout: Duration,
70 llm_budget: std::sync::atomic::AtomicI32,
72 retain_alternatives: bool,
73 llm_provider: Option<AnyProvider>,
75}
76
77impl ConflictResolver {
78 #[must_use]
85 pub fn new(
86 strategy: ConflictStrategy,
87 timeout_ms: u64,
88 llm_budget_per_turn: usize,
89 retain_alternatives: bool,
90 ) -> Self {
91 let budget = i32::try_from(llm_budget_per_turn).unwrap_or(i32::MAX);
92 Self {
93 strategy,
94 timeout: Duration::from_millis(timeout_ms),
95 llm_budget: std::sync::atomic::AtomicI32::new(budget),
96 retain_alternatives,
97 llm_provider: None,
98 }
99 }
100
101 #[must_use]
103 pub fn with_llm_provider(mut self, provider: AnyProvider) -> Self {
104 self.llm_provider = Some(provider);
105 self
106 }
107
108 pub fn reset_turn_budget(&self, budget: usize) {
110 let budget_i32 = i32::try_from(budget).unwrap_or(i32::MAX);
111 self.llm_budget
112 .store(budget_i32, std::sync::atomic::Ordering::Relaxed);
113 }
114
115 pub async fn resolve(
123 &self,
124 mut candidates: Vec<Edge>,
125 metrics: &ApexMetrics,
126 ) -> Result<ConflictResult, MemoryError> {
127 tracing::debug!(target: "memory.graph.apex.conflict_resolve", candidates = candidates.len());
128
129 if candidates.is_empty() {
130 return Err(MemoryError::InvalidInput(
131 "conflict resolver called with empty candidate list".into(),
132 ));
133 }
134 if candidates.len() == 1 {
135 return Ok(ConflictResult {
136 winner: candidates.remove(0),
137 alternatives: Vec::new(),
138 });
139 }
140
141 let effective_strategy = self.effective_strategy();
142 let winner_idx = match effective_strategy {
143 ConflictStrategy::Recency => recency_winner(&candidates),
144 ConflictStrategy::Confidence => confidence_winner(&candidates),
145 ConflictStrategy::Llm => self.llm_winner(&candidates, metrics).await,
146 };
147
148 metrics
149 .conflicts_total
150 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
151
152 let winner = candidates.remove(winner_idx);
153 let alternatives = if self.retain_alternatives {
154 candidates
155 } else {
156 Vec::new()
157 };
158 Ok(ConflictResult {
159 winner,
160 alternatives,
161 })
162 }
163
164 fn effective_strategy(&self) -> ConflictStrategy {
166 if self.strategy == ConflictStrategy::Llm {
167 let remaining = self.llm_budget.load(std::sync::atomic::Ordering::Relaxed);
168 if remaining <= 0 {
169 return ConflictStrategy::Recency;
170 }
171 }
172 self.strategy.clone()
173 }
174
175 async fn llm_winner(&self, candidates: &[Edge], metrics: &ApexMetrics) -> usize {
176 tracing::debug!(target: "memory.graph.apex.conflict_llm", candidates = candidates.len());
177 self.llm_budget
178 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
179
180 let Some(provider) = &self.llm_provider else {
181 return recency_winner(candidates);
183 };
184
185 let prompt = build_conflict_prompt(candidates);
186 let messages = [
187 Message::from_legacy(
188 Role::System,
189 "You are a knowledge graph conflict resolver. Given a list of conflicting \
190 edge facts indexed from 0, respond with only the index of the most \
191 authoritative or recent fact. Output a single integer and nothing else.",
192 ),
193 Message::from_legacy(Role::User, prompt),
194 ];
195
196 let timeout = self.timeout;
197 match tokio::time::timeout(timeout, provider.chat(&messages)).await {
198 Ok(Ok(response)) => {
199 let trimmed = response.trim();
200 if let Ok(idx) = trimmed.parse::<usize>()
201 && idx < candidates.len()
202 {
203 return idx;
204 }
205 tracing::warn!(
206 raw = %trimmed,
207 "apex_mem: LLM conflict resolver returned unparseable index, falling back to recency"
208 );
209 recency_winner(candidates)
210 }
211 Ok(Err(e)) => {
212 tracing::warn!(error = %e,
213 "apex_mem: LLM conflict resolver call failed, falling back to recency");
214 recency_winner(candidates)
215 }
216 Err(_) => {
217 metrics
218 .llm_timeouts_total
219 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
220 tracing::warn!(
221 "apex_mem: LLM conflict resolver timed out after {}ms, falling back to recency",
222 timeout.as_millis()
223 );
224 recency_winner(candidates)
225 }
226 }
227 }
228}
229
230fn build_conflict_prompt(candidates: &[Edge]) -> String {
231 let mut lines = String::from("Conflicting facts for the same predicate:\n");
232 for (i, edge) in candidates.iter().enumerate() {
233 use std::fmt::Write as _;
234 let _ = writeln!(lines, "{i}: [{}] {}", edge.valid_from, edge.fact);
235 }
236 lines.push_str(
237 "\nWhich index (0-based) is the most authoritative? Respond with only the integer.",
238 );
239 lines
240}
241
242fn recency_winner(candidates: &[Edge]) -> usize {
243 candidates
244 .iter()
245 .enumerate()
246 .max_by(|(_, a), (_, b)| a.valid_from.cmp(&b.valid_from))
247 .map_or(0, |(i, _)| i)
248}
249
250fn confidence_winner(candidates: &[Edge]) -> usize {
251 candidates
252 .iter()
253 .enumerate()
254 .max_by(|(_, a), (_, b)| {
255 a.confidence
256 .partial_cmp(&b.confidence)
257 .unwrap_or(std::cmp::Ordering::Equal)
258 })
259 .map_or(0, |(i, _)| i)
260}
261
262#[derive(Debug, Default)]
268pub struct ApexMetrics {
269 pub supersedes_total: std::sync::atomic::AtomicU64,
271 pub conflicts_total: std::sync::atomic::AtomicU64,
273 pub llm_timeouts_total: std::sync::atomic::AtomicU64,
275 pub unmapped_predicates_total: std::sync::atomic::AtomicU64,
277}
278
279impl ApexMetrics {
280 #[must_use]
282 pub fn snapshot(&self) -> Vec<(&'static str, u64)> {
283 vec![
284 (
285 "apex_mem_supersedes_total",
286 self.supersedes_total
287 .load(std::sync::atomic::Ordering::Relaxed),
288 ),
289 (
290 "apex_mem_conflicts_total",
291 self.conflicts_total
292 .load(std::sync::atomic::Ordering::Relaxed),
293 ),
294 (
295 "apex_mem_llm_timeouts_total",
296 self.llm_timeouts_total
297 .load(std::sync::atomic::Ordering::Relaxed),
298 ),
299 (
300 "apex_mem_unmapped_predicates_total",
301 self.unmapped_predicates_total
302 .load(std::sync::atomic::Ordering::Relaxed),
303 ),
304 ]
305 }
306}
307
308#[cfg(test)]
311mod tests {
312 use super::*;
313
314 fn make_edge(id: i64, valid_from: &str, confidence: f32) -> Edge {
315 Edge {
316 id,
317 source_entity_id: 1,
318 target_entity_id: 2,
319 relation: "works_at".into(),
320 canonical_relation: "works_at".into(),
321 fact: "fact".into(),
322 confidence,
323 valid_from: valid_from.to_string(),
324 valid_to: None,
325 created_at: valid_from.to_string(),
326 expired_at: None,
327 source_message_id: None,
328 qdrant_point_id: None,
329 edge_type: crate::graph::types::EdgeType::Semantic,
330 retrieval_count: 0,
331 last_retrieved_at: None,
332 superseded_by: None,
333 supersedes: None,
334 weight: 1.0,
335 }
336 }
337
338 #[tokio::test]
339 async fn recency_strategy_picks_newest() {
340 let metrics = ApexMetrics::default();
341 let resolver = ConflictResolver::new(ConflictStrategy::Recency, 500, 3, false);
342 let candidates = vec![
343 make_edge(1, "2026-01-01 00:00:00", 0.9),
344 make_edge(2, "2026-06-01 00:00:00", 0.5),
345 make_edge(3, "2026-03-01 00:00:00", 0.7),
346 ];
347 let result = resolver.resolve(candidates, &metrics).await.unwrap();
348 assert_eq!(result.winner.id, 2, "newest valid_from wins");
349 }
350
351 #[tokio::test]
352 async fn confidence_strategy_picks_highest() {
353 let metrics = ApexMetrics::default();
354 let resolver = ConflictResolver::new(ConflictStrategy::Confidence, 500, 3, false);
355 let candidates = vec![
356 make_edge(1, "2026-01-01 00:00:00", 0.9),
357 make_edge(2, "2026-06-01 00:00:00", 0.5),
358 make_edge(3, "2026-03-01 00:00:00", 0.7),
359 ];
360 let result = resolver.resolve(candidates, &metrics).await.unwrap();
361 assert_eq!(result.winner.id, 1);
362 }
363
364 #[tokio::test]
365 async fn single_candidate_passes_through() {
366 let metrics = ApexMetrics::default();
367 let resolver = ConflictResolver::new(ConflictStrategy::Recency, 500, 3, false);
368 let candidates = vec![make_edge(42, "2026-01-01 00:00:00", 0.8)];
369 let result = resolver.resolve(candidates, &metrics).await.unwrap();
370 assert_eq!(result.winner.id, 42);
371 assert!(result.alternatives.is_empty());
372 }
373
374 #[tokio::test]
375 async fn retain_alternatives_when_enabled() {
376 let metrics = ApexMetrics::default();
377 let resolver = ConflictResolver::new(ConflictStrategy::Recency, 500, 3, true);
378 let candidates = vec![
379 make_edge(1, "2026-01-01 00:00:00", 0.9),
380 make_edge(2, "2026-06-01 00:00:00", 0.5),
381 ];
382 let result = resolver.resolve(candidates, &metrics).await.unwrap();
383 assert_eq!(result.winner.id, 2);
384 assert_eq!(result.alternatives.len(), 1);
385 assert_eq!(result.alternatives[0].id, 1);
386 }
387
388 #[tokio::test]
389 async fn budget_exhaustion_falls_back_to_recency() {
390 let metrics = ApexMetrics::default();
391 let resolver = ConflictResolver::new(ConflictStrategy::Llm, 500, 0, false);
392 let candidates = vec![
394 make_edge(1, "2026-01-01 00:00:00", 0.9),
395 make_edge(2, "2026-06-01 00:00:00", 0.5),
396 ];
397 let result = resolver.resolve(candidates, &metrics).await.unwrap();
398 assert_eq!(result.winner.id, 2);
399 }
400
401 #[test]
402 fn metrics_snapshot_has_four_entries() {
403 let m = ApexMetrics::default();
404 assert_eq!(m.snapshot().len(), 4);
405 }
406}