Skip to main content

zeph_memory/graph/
conflict.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! APEX-MEM conflict resolution for cardinality-1 predicates.
5//!
6//! When multiple head edges share `(subject, canonical_relation)` and the predicate has
7//! `cardinality = 1`, this module selects one authoritative edge per the configured
8//! [`ConflictStrategy`]:
9//!
10//! - `Recency`: picks the edge with the greatest `valid_from`.
11//! - `Confidence`: picks the edge with the highest `confidence`.
12//! - `Llm`: delegates to an LLM provider with a 500 ms hard timeout, falling back to `Recency`.
13//!
14//! # Invariants
15//!
16//! - The resolver is only invoked for cardinality-1 predicates; cardinality-n predicates
17//!   pass all head edges through unchanged.
18//! - The LLM strategy respects a 500 ms mandatory timeout and a per-turn budget cap; both
19//!   exhaustion paths fall back to `Recency`.
20//! - Losing edges are optionally retained in `alternatives` (disabled by default).
21//!
22//! # Unique index vs conflict resolver
23//!
24//! The partial unique index `uq_graph_edges_active_head` prevents same-target duplicates
25//! for a cardinality-1 predicate (i.e., two rows for the exact same target entity cannot
26//! both be active). The conflict resolver handles the orthogonal case: two head edges with
27//! *different* targets for the same cardinality-1 predicate (e.g., `works_at Acme` vs
28//! `works_at Globex`).
29
30use std::time::Duration;
31
32use zeph_llm::any::AnyProvider;
33use zeph_llm::provider::{LlmProvider as _, Message, Role};
34
35use crate::error::MemoryError;
36use crate::graph::types::Edge;
37
38/// Conflict resolution strategy for cardinality-1 predicates.
39///
40/// Mirrors `zeph_config::ConflictStrategy` but lives in `zeph-memory` to avoid
41/// a circular crate dependency (`zeph-memory` → `zeph-config` → `zeph-mcp` → `zeph-memory`).
42/// `zeph-config` re-exports its own copy; callers convert between the two.
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum ConflictStrategy {
45    /// Pick the edge with the most recent `valid_from` timestamp.
46    Recency,
47    /// Pick the edge with the highest `confidence` value.
48    Confidence,
49    /// Delegate to the configured LLM provider (500 ms timeout, falls back to `Recency`).
50    Llm,
51}
52
53/// Maximum allowed depth when walking a `supersedes` chain for cycle detection.
54/// Defined here as a named constant per critic nit #7.
55pub const SUPERSEDE_DEPTH_CAP: usize = 64;
56
57/// Output of conflict resolution for a single `(subject, canonical_relation)` group.
58pub struct ConflictResult {
59    /// The authoritative edge chosen by the resolver.
60    pub winner: Edge,
61    /// Edges that were not selected. Populated only when `retain_alternatives = true`.
62    pub alternatives: Vec<Edge>,
63}
64
65/// Conflict resolver for cardinality-1 predicate groups.
66pub struct ConflictResolver {
67    strategy: ConflictStrategy,
68    timeout: Duration,
69    /// Remaining LLM calls allowed this turn (decremented on each LLM invocation).
70    llm_budget: std::sync::atomic::AtomicI32,
71    retain_alternatives: bool,
72    /// LLM provider used when `strategy = Llm`. `None` falls back to `Recency`.
73    llm_provider: Option<AnyProvider>,
74}
75
76impl ConflictResolver {
77    /// Create a new resolver.
78    ///
79    /// - `strategy`: resolution strategy
80    /// - `timeout_ms`: LLM resolver hard timeout in milliseconds (mandatory 500 ms per spec)
81    /// - `llm_budget_per_turn`: max LLM calls per agent turn before falling back to recency
82    /// - `retain_alternatives`: when `true`, losing edges are returned in `ConflictResult::alternatives`
83    #[must_use]
84    pub fn new(
85        strategy: ConflictStrategy,
86        timeout_ms: u64,
87        llm_budget_per_turn: usize,
88        retain_alternatives: bool,
89    ) -> Self {
90        let budget = i32::try_from(llm_budget_per_turn).unwrap_or(i32::MAX);
91        Self {
92            strategy,
93            timeout: Duration::from_millis(timeout_ms),
94            llm_budget: std::sync::atomic::AtomicI32::new(budget),
95            retain_alternatives,
96            llm_provider: None,
97        }
98    }
99
100    /// Attach an LLM provider for `strategy = Llm` conflict resolution.
101    #[must_use]
102    pub fn with_llm_provider(mut self, provider: AnyProvider) -> Self {
103        self.llm_provider = Some(provider);
104        self
105    }
106
107    /// Reset the per-turn LLM budget. Call at the start of each agent turn.
108    pub fn reset_turn_budget(&self, budget: usize) {
109        let budget_i32 = i32::try_from(budget).unwrap_or(i32::MAX);
110        self.llm_budget
111            .store(budget_i32, std::sync::atomic::Ordering::Relaxed);
112    }
113
114    /// Resolve a group of head edges that share the same cardinality-1 predicate.
115    ///
116    /// `candidates` must be non-empty and all share `(source_entity_id, canonical_relation)`.
117    ///
118    /// # Errors
119    ///
120    /// Returns an error only on internal logic failures (empty candidate list).
121    pub async fn resolve(
122        &self,
123        mut candidates: Vec<Edge>,
124        metrics: &ApexMetrics,
125    ) -> Result<ConflictResult, MemoryError> {
126        tracing::debug!(target: "memory.graph.apex.conflict_resolve", candidates = candidates.len());
127
128        if candidates.is_empty() {
129            return Err(MemoryError::InvalidInput(
130                "conflict resolver called with empty candidate list".into(),
131            ));
132        }
133        if candidates.len() == 1 {
134            return Ok(ConflictResult {
135                winner: candidates.remove(0),
136                alternatives: Vec::new(),
137            });
138        }
139
140        let effective_strategy = self.effective_strategy();
141        let winner_idx = match effective_strategy {
142            ConflictStrategy::Recency => recency_winner(&candidates),
143            ConflictStrategy::Confidence => confidence_winner(&candidates),
144            ConflictStrategy::Llm => self.llm_winner(&candidates, metrics).await,
145        };
146
147        metrics
148            .conflicts_total
149            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
150
151        let winner = candidates.remove(winner_idx);
152        let alternatives = if self.retain_alternatives {
153            candidates
154        } else {
155            Vec::new()
156        };
157        Ok(ConflictResult {
158            winner,
159            alternatives,
160        })
161    }
162
163    /// Return the active strategy, falling back to `Recency` if LLM budget is exhausted.
164    fn effective_strategy(&self) -> ConflictStrategy {
165        if self.strategy == ConflictStrategy::Llm {
166            let remaining = self.llm_budget.load(std::sync::atomic::Ordering::Relaxed);
167            if remaining <= 0 {
168                return ConflictStrategy::Recency;
169            }
170        }
171        self.strategy.clone()
172    }
173
174    async fn llm_winner(&self, candidates: &[Edge], metrics: &ApexMetrics) -> usize {
175        tracing::debug!(target: "memory.graph.apex.conflict_llm", candidates = candidates.len());
176        self.llm_budget
177            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
178
179        let Some(provider) = &self.llm_provider else {
180            // No provider configured — fall back to recency without consuming timeout.
181            return recency_winner(candidates);
182        };
183
184        let prompt = build_conflict_prompt(candidates);
185        let messages = [
186            Message::from_legacy(
187                Role::System,
188                "You are a knowledge graph conflict resolver. Given a list of conflicting \
189                 edge facts indexed from 0, respond with only the index of the most \
190                 authoritative or recent fact. Output a single integer and nothing else.",
191            ),
192            Message::from_legacy(Role::User, prompt),
193        ];
194
195        let timeout = self.timeout;
196        match tokio::time::timeout(timeout, provider.chat(&messages)).await {
197            Ok(Ok(response)) => {
198                let trimmed = response.trim();
199                if let Ok(idx) = trimmed.parse::<usize>()
200                    && idx < candidates.len()
201                {
202                    return idx;
203                }
204                tracing::warn!(
205                    raw = %trimmed,
206                    "apex_mem: LLM conflict resolver returned unparseable index, falling back to recency"
207                );
208                recency_winner(candidates)
209            }
210            Ok(Err(e)) => {
211                tracing::warn!(error = %e,
212                    "apex_mem: LLM conflict resolver call failed, falling back to recency");
213                recency_winner(candidates)
214            }
215            Err(_) => {
216                metrics
217                    .llm_timeouts_total
218                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
219                tracing::warn!(
220                    "apex_mem: LLM conflict resolver timed out after {}ms, falling back to recency",
221                    timeout.as_millis()
222                );
223                recency_winner(candidates)
224            }
225        }
226    }
227}
228
229fn build_conflict_prompt(candidates: &[Edge]) -> String {
230    let mut lines = String::from("Conflicting facts for the same predicate:\n");
231    for (i, edge) in candidates.iter().enumerate() {
232        use std::fmt::Write as _;
233        let _ = writeln!(lines, "{i}: [{}] {}", edge.valid_from, edge.fact);
234    }
235    lines.push_str(
236        "\nWhich index (0-based) is the most authoritative? Respond with only the integer.",
237    );
238    lines
239}
240
241fn recency_winner(candidates: &[Edge]) -> usize {
242    candidates
243        .iter()
244        .enumerate()
245        .max_by(|(_, a), (_, b)| a.valid_from.cmp(&b.valid_from))
246        .map_or(0, |(i, _)| i)
247}
248
249fn confidence_winner(candidates: &[Edge]) -> usize {
250    candidates
251        .iter()
252        .enumerate()
253        .max_by(|(_, a), (_, b)| {
254            a.confidence
255                .partial_cmp(&b.confidence)
256                .unwrap_or(std::cmp::Ordering::Equal)
257        })
258        .map_or(0, |(i, _)| i)
259}
260
261// ── Metrics counters ─────────────────────────────────────────────────────────
262
263/// Atomic counters for APEX-MEM Prometheus metrics.
264///
265/// Shared across the store and conflict resolver via `Arc`.
266#[derive(Debug, Default)]
267pub struct ApexMetrics {
268    /// Number of append-only supersede operations performed.
269    pub supersedes_total: std::sync::atomic::AtomicU64,
270    /// Number of conflict resolution operations performed.
271    pub conflicts_total: std::sync::atomic::AtomicU64,
272    /// Number of LLM conflict resolver timeout fallbacks.
273    pub llm_timeouts_total: std::sync::atomic::AtomicU64,
274    /// Number of predicates with no ontology entry (unmapped).
275    pub unmapped_predicates_total: std::sync::atomic::AtomicU64,
276}
277
278impl ApexMetrics {
279    /// Collect current counter snapshots as `(name, value)` pairs.
280    #[must_use]
281    pub fn snapshot(&self) -> Vec<(&'static str, u64)> {
282        vec![
283            (
284                "apex_mem_supersedes_total",
285                self.supersedes_total
286                    .load(std::sync::atomic::Ordering::Relaxed),
287            ),
288            (
289                "apex_mem_conflicts_total",
290                self.conflicts_total
291                    .load(std::sync::atomic::Ordering::Relaxed),
292            ),
293            (
294                "apex_mem_llm_timeouts_total",
295                self.llm_timeouts_total
296                    .load(std::sync::atomic::Ordering::Relaxed),
297            ),
298            (
299                "apex_mem_unmapped_predicates_total",
300                self.unmapped_predicates_total
301                    .load(std::sync::atomic::Ordering::Relaxed),
302            ),
303        ]
304    }
305}
306
307// ── Tests ────────────────────────────────────────────────────────────────────
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312
313    fn make_edge(id: i64, valid_from: &str, confidence: f32) -> Edge {
314        Edge {
315            id,
316            source_entity_id: 1,
317            target_entity_id: 2,
318            relation: "works_at".into(),
319            canonical_relation: "works_at".into(),
320            fact: "fact".into(),
321            confidence,
322            valid_from: valid_from.to_string(),
323            valid_to: None,
324            created_at: valid_from.to_string(),
325            expired_at: None,
326            source_message_id: None,
327            qdrant_point_id: None,
328            edge_type: crate::graph::types::EdgeType::Semantic,
329            retrieval_count: 0,
330            last_retrieved_at: None,
331            superseded_by: None,
332            supersedes: None,
333        }
334    }
335
336    #[tokio::test]
337    async fn recency_strategy_picks_newest() {
338        let metrics = ApexMetrics::default();
339        let resolver = ConflictResolver::new(ConflictStrategy::Recency, 500, 3, false);
340        let candidates = vec![
341            make_edge(1, "2026-01-01 00:00:00", 0.9),
342            make_edge(2, "2026-06-01 00:00:00", 0.5),
343            make_edge(3, "2026-03-01 00:00:00", 0.7),
344        ];
345        let result = resolver.resolve(candidates, &metrics).await.unwrap();
346        assert_eq!(result.winner.id, 2, "newest valid_from wins");
347    }
348
349    #[tokio::test]
350    async fn confidence_strategy_picks_highest() {
351        let metrics = ApexMetrics::default();
352        let resolver = ConflictResolver::new(ConflictStrategy::Confidence, 500, 3, false);
353        let candidates = vec![
354            make_edge(1, "2026-01-01 00:00:00", 0.9),
355            make_edge(2, "2026-06-01 00:00:00", 0.5),
356            make_edge(3, "2026-03-01 00:00:00", 0.7),
357        ];
358        let result = resolver.resolve(candidates, &metrics).await.unwrap();
359        assert_eq!(result.winner.id, 1);
360    }
361
362    #[tokio::test]
363    async fn single_candidate_passes_through() {
364        let metrics = ApexMetrics::default();
365        let resolver = ConflictResolver::new(ConflictStrategy::Recency, 500, 3, false);
366        let candidates = vec![make_edge(42, "2026-01-01 00:00:00", 0.8)];
367        let result = resolver.resolve(candidates, &metrics).await.unwrap();
368        assert_eq!(result.winner.id, 42);
369        assert!(result.alternatives.is_empty());
370    }
371
372    #[tokio::test]
373    async fn retain_alternatives_when_enabled() {
374        let metrics = ApexMetrics::default();
375        let resolver = ConflictResolver::new(ConflictStrategy::Recency, 500, 3, true);
376        let candidates = vec![
377            make_edge(1, "2026-01-01 00:00:00", 0.9),
378            make_edge(2, "2026-06-01 00:00:00", 0.5),
379        ];
380        let result = resolver.resolve(candidates, &metrics).await.unwrap();
381        assert_eq!(result.winner.id, 2);
382        assert_eq!(result.alternatives.len(), 1);
383        assert_eq!(result.alternatives[0].id, 1);
384    }
385
386    #[tokio::test]
387    async fn budget_exhaustion_falls_back_to_recency() {
388        let metrics = ApexMetrics::default();
389        let resolver = ConflictResolver::new(ConflictStrategy::Llm, 500, 0, false);
390        // Budget = 0 → effective strategy is Recency immediately.
391        let candidates = vec![
392            make_edge(1, "2026-01-01 00:00:00", 0.9),
393            make_edge(2, "2026-06-01 00:00:00", 0.5),
394        ];
395        let result = resolver.resolve(candidates, &metrics).await.unwrap();
396        assert_eq!(result.winner.id, 2);
397    }
398
399    #[test]
400    fn metrics_snapshot_has_four_entries() {
401        let m = ApexMetrics::default();
402        assert_eq!(m.snapshot().len(), 4);
403    }
404}