Skip to main content

zeph_memory/graph/
conflict.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! APEX-MEM conflict resolution for cardinality-1 predicates.
5//!
6//! When multiple head edges share `(subject, canonical_relation)` and the predicate has
7//! `cardinality = 1`, this module selects one authoritative edge per the configured
8//! [`ConflictStrategy`]:
9//!
10//! - `Recency`: picks the edge with the greatest `valid_from`.
11//! - `Confidence`: picks the edge with the highest `confidence`.
12//! - `Llm`: delegates to an LLM provider with a 500 ms hard timeout, falling back to `Recency`.
13//!
14//! # Invariants
15//!
16//! - The resolver is only invoked for cardinality-1 predicates; cardinality-n predicates
17//!   pass all head edges through unchanged.
18//! - The LLM strategy respects a 500 ms mandatory timeout and a per-turn budget cap; both
19//!   exhaustion paths fall back to `Recency`.
20//! - Losing edges are optionally retained in `alternatives` (disabled by default).
21//!
22//! # Unique index vs conflict resolver
23//!
24//! The partial unique index `uq_graph_edges_active_head` prevents same-target duplicates
25//! for a cardinality-1 predicate (i.e., two rows for the exact same target entity cannot
26//! both be active). The conflict resolver handles the orthogonal case: two head edges with
27//! *different* targets for the same cardinality-1 predicate (e.g., `works_at Acme` vs
28//! `works_at Globex`).
29
30use std::time::Duration;
31
32use zeph_llm::any::AnyProvider;
33use zeph_llm::provider::{LlmProvider as _, Message, Role};
34
35use crate::error::MemoryError;
36use crate::graph::types::Edge;
37
38/// Conflict resolution strategy for cardinality-1 predicates.
39///
40/// Mirrors `zeph_config::ConflictStrategy` but lives in `zeph-memory` to avoid
41/// a circular crate dependency (`zeph-memory` → `zeph-config` → `zeph-mcp` → `zeph-memory`).
42/// `zeph-config` re-exports its own copy; callers convert between the two.
43#[derive(Debug, Clone, PartialEq, Eq)]
44#[non_exhaustive]
45pub enum ConflictStrategy {
46    /// Pick the edge with the most recent `valid_from` timestamp.
47    Recency,
48    /// Pick the edge with the highest `confidence` value.
49    Confidence,
50    /// Delegate to the configured LLM provider (500 ms timeout, falls back to `Recency`).
51    Llm,
52}
53
54/// Maximum allowed depth when walking a `supersedes` chain for cycle detection.
55/// Defined here as a named constant per critic nit #7.
56pub const SUPERSEDE_DEPTH_CAP: usize = 64;
57
58/// Output of conflict resolution for a single `(subject, canonical_relation)` group.
59pub struct ConflictResult {
60    /// The authoritative edge chosen by the resolver.
61    pub winner: Edge,
62    /// Edges that were not selected. Populated only when `retain_alternatives = true`.
63    pub alternatives: Vec<Edge>,
64}
65
66/// Conflict resolver for cardinality-1 predicate groups.
67pub struct ConflictResolver {
68    strategy: ConflictStrategy,
69    timeout: Duration,
70    /// Remaining LLM calls allowed this turn (decremented on each LLM invocation).
71    llm_budget: std::sync::atomic::AtomicI32,
72    retain_alternatives: bool,
73    /// LLM provider used when `strategy = Llm`. `None` falls back to `Recency`.
74    llm_provider: Option<AnyProvider>,
75}
76
77impl ConflictResolver {
78    /// Create a new resolver.
79    ///
80    /// - `strategy`: resolution strategy
81    /// - `timeout_ms`: LLM resolver hard timeout in milliseconds (mandatory 500 ms per spec)
82    /// - `llm_budget_per_turn`: max LLM calls per agent turn before falling back to recency
83    /// - `retain_alternatives`: when `true`, losing edges are returned in `ConflictResult::alternatives`
84    #[must_use]
85    pub fn new(
86        strategy: ConflictStrategy,
87        timeout_ms: u64,
88        llm_budget_per_turn: usize,
89        retain_alternatives: bool,
90    ) -> Self {
91        let budget = i32::try_from(llm_budget_per_turn).unwrap_or(i32::MAX);
92        Self {
93            strategy,
94            timeout: Duration::from_millis(timeout_ms),
95            llm_budget: std::sync::atomic::AtomicI32::new(budget),
96            retain_alternatives,
97            llm_provider: None,
98        }
99    }
100
101    /// Attach an LLM provider for `strategy = Llm` conflict resolution.
102    #[must_use]
103    pub fn with_llm_provider(mut self, provider: AnyProvider) -> Self {
104        self.llm_provider = Some(provider);
105        self
106    }
107
108    /// Reset the per-turn LLM budget. Call at the start of each agent turn.
109    pub fn reset_turn_budget(&self, budget: usize) {
110        let budget_i32 = i32::try_from(budget).unwrap_or(i32::MAX);
111        self.llm_budget
112            .store(budget_i32, std::sync::atomic::Ordering::Relaxed);
113    }
114
115    /// Resolve a group of head edges that share the same cardinality-1 predicate.
116    ///
117    /// `candidates` must be non-empty and all share `(source_entity_id, canonical_relation)`.
118    ///
119    /// # Errors
120    ///
121    /// Returns an error only on internal logic failures (empty candidate list).
122    pub async fn resolve(
123        &self,
124        mut candidates: Vec<Edge>,
125        metrics: &ApexMetrics,
126    ) -> Result<ConflictResult, MemoryError> {
127        tracing::debug!(target: "memory.graph.apex.conflict_resolve", candidates = candidates.len());
128
129        if candidates.is_empty() {
130            return Err(MemoryError::InvalidInput(
131                "conflict resolver called with empty candidate list".into(),
132            ));
133        }
134        if candidates.len() == 1 {
135            return Ok(ConflictResult {
136                winner: candidates.remove(0),
137                alternatives: Vec::new(),
138            });
139        }
140
141        let effective_strategy = self.effective_strategy();
142        let winner_idx = match effective_strategy {
143            ConflictStrategy::Recency => recency_winner(&candidates),
144            ConflictStrategy::Confidence => confidence_winner(&candidates),
145            ConflictStrategy::Llm => self.llm_winner(&candidates, metrics).await,
146        };
147
148        metrics
149            .conflicts_total
150            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
151
152        let winner = candidates.remove(winner_idx);
153        let alternatives = if self.retain_alternatives {
154            candidates
155        } else {
156            Vec::new()
157        };
158        Ok(ConflictResult {
159            winner,
160            alternatives,
161        })
162    }
163
164    /// Return the active strategy, falling back to `Recency` if LLM budget is exhausted.
165    fn effective_strategy(&self) -> ConflictStrategy {
166        if self.strategy == ConflictStrategy::Llm {
167            let remaining = self.llm_budget.load(std::sync::atomic::Ordering::Relaxed);
168            if remaining <= 0 {
169                return ConflictStrategy::Recency;
170            }
171        }
172        self.strategy.clone()
173    }
174
175    async fn llm_winner(&self, candidates: &[Edge], metrics: &ApexMetrics) -> usize {
176        tracing::debug!(target: "memory.graph.apex.conflict_llm", candidates = candidates.len());
177        self.llm_budget
178            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
179
180        let Some(provider) = &self.llm_provider else {
181            // No provider configured — fall back to recency without consuming timeout.
182            return recency_winner(candidates);
183        };
184
185        let prompt = build_conflict_prompt(candidates);
186        let messages = [
187            Message::from_legacy(
188                Role::System,
189                "You are a knowledge graph conflict resolver. Given a list of conflicting \
190                 edge facts indexed from 0, respond with only the index of the most \
191                 authoritative or recent fact. Output a single integer and nothing else.",
192            ),
193            Message::from_legacy(Role::User, prompt),
194        ];
195
196        let timeout = self.timeout;
197        match tokio::time::timeout(timeout, provider.chat(&messages)).await {
198            Ok(Ok(response)) => {
199                let trimmed = response.trim();
200                if let Ok(idx) = trimmed.parse::<usize>()
201                    && idx < candidates.len()
202                {
203                    return idx;
204                }
205                tracing::warn!(
206                    raw = %trimmed,
207                    "apex_mem: LLM conflict resolver returned unparseable index, falling back to recency"
208                );
209                recency_winner(candidates)
210            }
211            Ok(Err(e)) => {
212                tracing::warn!(error = %e,
213                    "apex_mem: LLM conflict resolver call failed, falling back to recency");
214                recency_winner(candidates)
215            }
216            Err(_) => {
217                metrics
218                    .llm_timeouts_total
219                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
220                tracing::warn!(
221                    "apex_mem: LLM conflict resolver timed out after {}ms, falling back to recency",
222                    timeout.as_millis()
223                );
224                recency_winner(candidates)
225            }
226        }
227    }
228}
229
230fn build_conflict_prompt(candidates: &[Edge]) -> String {
231    let mut lines = String::from("Conflicting facts for the same predicate:\n");
232    for (i, edge) in candidates.iter().enumerate() {
233        use std::fmt::Write as _;
234        let _ = writeln!(lines, "{i}: [{}] {}", edge.valid_from, edge.fact);
235    }
236    lines.push_str(
237        "\nWhich index (0-based) is the most authoritative? Respond with only the integer.",
238    );
239    lines
240}
241
242fn recency_winner(candidates: &[Edge]) -> usize {
243    candidates
244        .iter()
245        .enumerate()
246        .max_by(|(_, a), (_, b)| a.valid_from.cmp(&b.valid_from))
247        .map_or(0, |(i, _)| i)
248}
249
250fn confidence_winner(candidates: &[Edge]) -> usize {
251    candidates
252        .iter()
253        .enumerate()
254        .max_by(|(_, a), (_, b)| {
255            a.confidence
256                .partial_cmp(&b.confidence)
257                .unwrap_or(std::cmp::Ordering::Equal)
258        })
259        .map_or(0, |(i, _)| i)
260}
261
262// ── Metrics counters ─────────────────────────────────────────────────────────
263
264/// Atomic counters for APEX-MEM Prometheus metrics.
265///
266/// Shared across the store and conflict resolver via `Arc`.
267#[derive(Debug, Default)]
268pub struct ApexMetrics {
269    /// Number of append-only supersede operations performed.
270    pub supersedes_total: std::sync::atomic::AtomicU64,
271    /// Number of conflict resolution operations performed.
272    pub conflicts_total: std::sync::atomic::AtomicU64,
273    /// Number of LLM conflict resolver timeout fallbacks.
274    pub llm_timeouts_total: std::sync::atomic::AtomicU64,
275    /// Number of predicates with no ontology entry (unmapped).
276    pub unmapped_predicates_total: std::sync::atomic::AtomicU64,
277}
278
279impl ApexMetrics {
280    /// Collect current counter snapshots as `(name, value)` pairs.
281    #[must_use]
282    pub fn snapshot(&self) -> Vec<(&'static str, u64)> {
283        vec![
284            (
285                "apex_mem_supersedes_total",
286                self.supersedes_total
287                    .load(std::sync::atomic::Ordering::Relaxed),
288            ),
289            (
290                "apex_mem_conflicts_total",
291                self.conflicts_total
292                    .load(std::sync::atomic::Ordering::Relaxed),
293            ),
294            (
295                "apex_mem_llm_timeouts_total",
296                self.llm_timeouts_total
297                    .load(std::sync::atomic::Ordering::Relaxed),
298            ),
299            (
300                "apex_mem_unmapped_predicates_total",
301                self.unmapped_predicates_total
302                    .load(std::sync::atomic::Ordering::Relaxed),
303            ),
304        ]
305    }
306}
307
308// ── Tests ────────────────────────────────────────────────────────────────────
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    fn make_edge(id: i64, valid_from: &str, confidence: f32) -> Edge {
315        Edge {
316            id,
317            source_entity_id: 1,
318            target_entity_id: 2,
319            relation: "works_at".into(),
320            canonical_relation: "works_at".into(),
321            fact: "fact".into(),
322            confidence,
323            valid_from: valid_from.to_string(),
324            valid_to: None,
325            created_at: valid_from.to_string(),
326            expired_at: None,
327            source_message_id: None,
328            qdrant_point_id: None,
329            edge_type: crate::graph::types::EdgeType::Semantic,
330            retrieval_count: 0,
331            last_retrieved_at: None,
332            superseded_by: None,
333            supersedes: None,
334            weight: 1.0,
335        }
336    }
337
338    #[tokio::test]
339    async fn recency_strategy_picks_newest() {
340        let metrics = ApexMetrics::default();
341        let resolver = ConflictResolver::new(ConflictStrategy::Recency, 500, 3, false);
342        let candidates = vec![
343            make_edge(1, "2026-01-01 00:00:00", 0.9),
344            make_edge(2, "2026-06-01 00:00:00", 0.5),
345            make_edge(3, "2026-03-01 00:00:00", 0.7),
346        ];
347        let result = resolver.resolve(candidates, &metrics).await.unwrap();
348        assert_eq!(result.winner.id, 2, "newest valid_from wins");
349    }
350
351    #[tokio::test]
352    async fn confidence_strategy_picks_highest() {
353        let metrics = ApexMetrics::default();
354        let resolver = ConflictResolver::new(ConflictStrategy::Confidence, 500, 3, false);
355        let candidates = vec![
356            make_edge(1, "2026-01-01 00:00:00", 0.9),
357            make_edge(2, "2026-06-01 00:00:00", 0.5),
358            make_edge(3, "2026-03-01 00:00:00", 0.7),
359        ];
360        let result = resolver.resolve(candidates, &metrics).await.unwrap();
361        assert_eq!(result.winner.id, 1);
362    }
363
364    #[tokio::test]
365    async fn single_candidate_passes_through() {
366        let metrics = ApexMetrics::default();
367        let resolver = ConflictResolver::new(ConflictStrategy::Recency, 500, 3, false);
368        let candidates = vec![make_edge(42, "2026-01-01 00:00:00", 0.8)];
369        let result = resolver.resolve(candidates, &metrics).await.unwrap();
370        assert_eq!(result.winner.id, 42);
371        assert!(result.alternatives.is_empty());
372    }
373
374    #[tokio::test]
375    async fn retain_alternatives_when_enabled() {
376        let metrics = ApexMetrics::default();
377        let resolver = ConflictResolver::new(ConflictStrategy::Recency, 500, 3, true);
378        let candidates = vec![
379            make_edge(1, "2026-01-01 00:00:00", 0.9),
380            make_edge(2, "2026-06-01 00:00:00", 0.5),
381        ];
382        let result = resolver.resolve(candidates, &metrics).await.unwrap();
383        assert_eq!(result.winner.id, 2);
384        assert_eq!(result.alternatives.len(), 1);
385        assert_eq!(result.alternatives[0].id, 1);
386    }
387
388    #[tokio::test]
389    async fn budget_exhaustion_falls_back_to_recency() {
390        let metrics = ApexMetrics::default();
391        let resolver = ConflictResolver::new(ConflictStrategy::Llm, 500, 0, false);
392        // Budget = 0 → effective strategy is Recency immediately.
393        let candidates = vec![
394            make_edge(1, "2026-01-01 00:00:00", 0.9),
395            make_edge(2, "2026-06-01 00:00:00", 0.5),
396        ];
397        let result = resolver.resolve(candidates, &metrics).await.unwrap();
398        assert_eq!(result.winner.id, 2);
399    }
400
401    #[test]
402    fn metrics_snapshot_has_four_entries() {
403        let m = ApexMetrics::default();
404        assert_eq!(m.snapshot().len(), 4);
405    }
406}