Skip to main content

oxirs_cluster/
conflict_resolution.rs

1//! # Advanced Conflict Resolution for Distributed Operations
2//!
3//! This module provides sophisticated conflict resolution mechanisms for distributed RDF operations,
4//! including vector clocks, operational transforms, and semantic conflict detection.
5
6use crate::raft::OxirsNodeId;
7use anyhow::Result;
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, HashMap, HashSet};
10use std::sync::Arc;
11#[cfg(test)]
12use std::time::UNIX_EPOCH;
13use std::time::{Duration, SystemTime};
14use tokio::sync::RwLock;
15
16/// Vector clock for tracking causality in distributed operations
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18pub struct VectorClock {
19    /// Clock values per node
20    pub clocks: BTreeMap<OxirsNodeId, u64>,
21}
22
23impl VectorClock {
24    /// Create a new vector clock
25    pub fn new() -> Self {
26        Self {
27            clocks: BTreeMap::new(),
28        }
29    }
30
31    /// Increment clock for a specific node
32    pub fn increment(&mut self, node_id: OxirsNodeId) {
33        let counter = self.clocks.entry(node_id).or_insert(0);
34        *counter += 1;
35    }
36
37    /// Update clock based on received vector clock
38    pub fn update(&mut self, other: &VectorClock) {
39        for (node_id, other_time) in &other.clocks {
40            let my_time = self.clocks.entry(*node_id).or_insert(0);
41            *my_time = (*my_time).max(*other_time);
42        }
43    }
44
45    /// Check if this clock happens before another
46    pub fn happens_before(&self, other: &VectorClock) -> bool {
47        let mut all_less_or_equal = true;
48        let mut at_least_one_less = false;
49
50        // Get all node IDs from both clocks
51        let all_nodes: HashSet<_> = self.clocks.keys().chain(other.clocks.keys()).collect();
52
53        for node_id in all_nodes {
54            let my_time = self.clocks.get(node_id).unwrap_or(&0);
55            let other_time = other.clocks.get(node_id).unwrap_or(&0);
56
57            if my_time > other_time {
58                all_less_or_equal = false;
59                break;
60            }
61            if my_time < other_time {
62                at_least_one_less = true;
63            }
64        }
65
66        all_less_or_equal && at_least_one_less
67    }
68
69    /// Check if this clock is concurrent with another
70    pub fn is_concurrent(&self, other: &VectorClock) -> bool {
71        !self.happens_before(other) && !other.happens_before(self) && self != other
72    }
73
74    /// Merge two vector clocks
75    pub fn merge(&self, other: &VectorClock) -> VectorClock {
76        let mut result = self.clone();
77        result.update(other);
78        result
79    }
80}
81
82/// Distributed operation with vector clock
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
84pub struct TimestampedOperation {
85    /// Unique operation identifier
86    pub operation_id: String,
87    /// Node that originated the operation
88    pub origin_node: OxirsNodeId,
89    /// Vector clock at time of operation
90    pub vector_clock: VectorClock,
91    /// Physical timestamp
92    pub physical_time: SystemTime,
93    /// The actual RDF operation
94    pub operation: RdfOperation,
95    /// Operation priority (higher values have priority)
96    pub priority: u32,
97}
98
99/// RDF operation types for conflict resolution
100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
101pub enum RdfOperation {
102    /// Insert a triple
103    Insert {
104        subject: String,
105        predicate: String,
106        object: String,
107        graph: Option<String>,
108    },
109    /// Delete a triple
110    Delete {
111        subject: String,
112        predicate: String,
113        object: String,
114        graph: Option<String>,
115    },
116    /// Update triple (delete old, insert new)
117    Update {
118        old_triple: (String, String, String),
119        new_triple: (String, String, String),
120        graph: Option<String>,
121    },
122    /// Clear graph or entire store
123    Clear { graph: Option<String> },
124    /// Batch operation
125    Batch { operations: Vec<RdfOperation> },
126}
127
128/// Conflict types in distributed RDF operations
129#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
130pub enum ConflictType {
131    /// Write-write conflict on same triple
132    WriteWrite {
133        operation1: TimestampedOperation,
134        operation2: TimestampedOperation,
135    },
136    /// Delete-update conflict
137    DeleteUpdate {
138        delete_op: TimestampedOperation,
139        update_op: TimestampedOperation,
140    },
141    /// Semantic conflict (violates constraints)
142    Semantic {
143        conflicting_ops: Vec<TimestampedOperation>,
144        constraint_violation: String,
145    },
146    /// Clear conflicts with any other operation
147    Clear {
148        clear_op: TimestampedOperation,
149        conflicting_ops: Vec<TimestampedOperation>,
150    },
151}
152
153/// Conflict resolution strategy
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
155pub enum ResolutionStrategy {
156    /// Last writer wins based on vector clock causality
157    LastWriterWins,
158    /// First writer wins (reject later conflicting operations)
159    FirstWriterWins,
160    /// Priority-based resolution
161    PriorityBased,
162    /// Node-based resolution (prefer specific nodes)
163    NodePriority {
164        node_priorities: HashMap<OxirsNodeId, u32>,
165    },
166    /// Semantic resolution with application-specific rules
167    SemanticResolution { resolution_rules: Vec<SemanticRule> },
168    /// Custom resolution function
169    Custom { resolver_name: String },
170    /// Manual resolution required
171    Manual,
172}
173
174/// Semantic resolution rule
175#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
176pub struct SemanticRule {
177    /// Rule identifier
178    pub rule_id: String,
179    /// Rule description
180    pub description: String,
181    /// Pattern to match operations
182    pub pattern: OperationPattern,
183    /// Resolution action
184    pub action: ResolutionAction,
185}
186
187/// Pattern for matching operations in semantic rules
188#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
189pub struct OperationPattern {
190    /// Subject pattern (supports wildcards)
191    pub subject_pattern: Option<String>,
192    /// Predicate pattern
193    pub predicate_pattern: Option<String>,
194    /// Object pattern
195    pub object_pattern: Option<String>,
196    /// Operation type filter
197    pub operation_type: Option<OperationType>,
198}
199
200/// Simplified operation type for pattern matching
201#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
202pub enum OperationType {
203    Insert,
204    Delete,
205    Update,
206    Clear,
207}
208
209/// Resolution action for semantic rules
210#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
211pub enum ResolutionAction {
212    /// Accept the first operation, reject others
213    AcceptFirst,
214    /// Accept the last operation, reject others
215    AcceptLast,
216    /// Accept operation with highest priority
217    AcceptHighestPriority,
218    /// Merge operations if possible
219    Merge,
220    /// Reject all conflicting operations
221    RejectAll,
222    /// Custom action
223    Custom { action_name: String },
224}
225
226/// Result of conflict resolution
227#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
228pub struct ResolutionResult {
229    /// Original conflicting operations
230    pub conflicting_operations: Vec<TimestampedOperation>,
231    /// Resolved operations to apply
232    pub resolved_operations: Vec<TimestampedOperation>,
233    /// Operations that were rejected
234    pub rejected_operations: Vec<TimestampedOperation>,
235    /// Resolution strategy used
236    pub strategy_used: ResolutionStrategy,
237    /// Additional metadata about the resolution
238    pub metadata: HashMap<String, String>,
239}
240
241/// Advanced conflict resolver
242#[derive(Debug)]
243pub struct ConflictResolver {
244    /// Default resolution strategy
245    default_strategy: ResolutionStrategy,
246    /// Strategy overrides for specific patterns
247    strategy_overrides: Vec<(OperationPattern, ResolutionStrategy)>,
248    /// Semantic rules for conflict resolution
249    semantic_rules: Vec<SemanticRule>,
250    /// Node priorities for node-based resolution
251    node_priorities: HashMap<OxirsNodeId, u32>,
252    /// Statistics and metrics
253    resolution_stats: Arc<RwLock<ResolutionStatistics>>,
254}
255
256/// Statistics for conflict resolution
257#[derive(Debug, Default, Clone)]
258pub struct ResolutionStatistics {
259    /// Total conflicts resolved
260    pub total_conflicts: u64,
261    /// Conflicts by type
262    pub conflicts_by_type: HashMap<String, u64>,
263    /// Resolution strategies used
264    pub strategies_used: HashMap<String, u64>,
265    /// Average resolution time
266    pub average_resolution_time: Duration,
267    /// Success rate (resolved vs manual)
268    pub success_rate: f64,
269}
270
271impl ConflictResolver {
272    /// Create a new conflict resolver
273    pub fn new(default_strategy: ResolutionStrategy) -> Self {
274        Self {
275            default_strategy,
276            strategy_overrides: Vec::new(),
277            semantic_rules: Vec::new(),
278            node_priorities: HashMap::new(),
279            resolution_stats: Arc::new(RwLock::new(ResolutionStatistics::default())),
280        }
281    }
282
283    /// Add a strategy override for specific patterns
284    pub fn add_strategy_override(
285        &mut self,
286        pattern: OperationPattern,
287        strategy: ResolutionStrategy,
288    ) {
289        self.strategy_overrides.push((pattern, strategy));
290    }
291
292    /// Add a semantic rule
293    pub fn add_semantic_rule(&mut self, rule: SemanticRule) {
294        self.semantic_rules.push(rule);
295    }
296
297    /// Set node priority for node-based resolution
298    pub fn set_node_priority(&mut self, node_id: OxirsNodeId, priority: u32) {
299        self.node_priorities.insert(node_id, priority);
300    }
301
302    /// Detect conflicts between operations
303    pub async fn detect_conflicts(
304        &self,
305        operations: &[TimestampedOperation],
306    ) -> Result<Vec<ConflictType>> {
307        let mut conflicts = Vec::new();
308
309        // Check for write-write conflicts
310        for i in 0..operations.len() {
311            for j in (i + 1)..operations.len() {
312                let op1 = &operations[i];
313                let op2 = &operations[j];
314
315                if let Some(conflict) = self.check_operation_conflict(op1, op2).await? {
316                    conflicts.push(conflict);
317                }
318            }
319        }
320
321        // Check for semantic conflicts
322        let semantic_conflicts = self.check_semantic_conflicts(operations).await?;
323        conflicts.extend(semantic_conflicts);
324
325        Ok(conflicts)
326    }
327
328    /// Resolve conflicts using configured strategies
329    pub async fn resolve_conflicts(
330        &self,
331        conflicts: &[ConflictType],
332    ) -> Result<Vec<ResolutionResult>> {
333        let start_time = std::time::Instant::now();
334        let mut results = Vec::new();
335
336        for conflict in conflicts {
337            let result = self.resolve_single_conflict(conflict).await?;
338            results.push(result);
339        }
340
341        // Update statistics
342        let resolution_time = start_time.elapsed();
343        self.update_statistics(&results, resolution_time).await;
344
345        Ok(results)
346    }
347
348    /// Check for conflict between two operations
349    async fn check_operation_conflict(
350        &self,
351        op1: &TimestampedOperation,
352        op2: &TimestampedOperation,
353    ) -> Result<Option<ConflictType>> {
354        // Skip if operations are causally ordered
355        if op1.vector_clock.happens_before(&op2.vector_clock)
356            || op2.vector_clock.happens_before(&op1.vector_clock)
357        {
358            return Ok(None);
359        }
360
361        match (&op1.operation, &op2.operation) {
362            // Write-write conflicts
363            (
364                RdfOperation::Insert {
365                    subject: s1,
366                    predicate: p1,
367                    object: o1,
368                    graph: g1,
369                },
370                RdfOperation::Insert {
371                    subject: s2,
372                    predicate: p2,
373                    object: o2,
374                    graph: g2,
375                },
376            ) => {
377                if s1 == s2 && p1 == p2 && o1 != o2 && g1 == g2 {
378                    Ok(Some(ConflictType::WriteWrite {
379                        operation1: op1.clone(),
380                        operation2: op2.clone(),
381                    }))
382                } else {
383                    Ok(None)
384                }
385            }
386
387            // Delete-update conflicts
388            (
389                RdfOperation::Delete {
390                    subject: s1,
391                    predicate: p1,
392                    object: o1,
393                    graph: g1,
394                },
395                RdfOperation::Update {
396                    old_triple: (s2, p2, o2),
397                    graph: g2,
398                    ..
399                },
400            ) => {
401                if s1 == s2 && p1 == p2 && o1 == o2 && g1 == g2 {
402                    Ok(Some(ConflictType::DeleteUpdate {
403                        delete_op: op1.clone(),
404                        update_op: op2.clone(),
405                    }))
406                } else {
407                    Ok(None)
408                }
409            }
410
411            // Clear conflicts
412            (RdfOperation::Clear { graph: _g1 }, _) | (_, RdfOperation::Clear { graph: _g1 }) => {
413                let clear_op = if matches!(op1.operation, RdfOperation::Clear { .. }) {
414                    op1.clone()
415                } else {
416                    op2.clone()
417                };
418                let other_op = if matches!(op1.operation, RdfOperation::Clear { .. }) {
419                    op2.clone()
420                } else {
421                    op1.clone()
422                };
423
424                Ok(Some(ConflictType::Clear {
425                    clear_op,
426                    conflicting_ops: vec![other_op],
427                }))
428            }
429
430            _ => Ok(None),
431        }
432    }
433
434    /// Check for semantic conflicts
435    async fn check_semantic_conflicts(
436        &self,
437        operations: &[TimestampedOperation],
438    ) -> Result<Vec<ConflictType>> {
439        let mut conflicts = Vec::new();
440
441        // Apply semantic rules
442        for rule in &self.semantic_rules {
443            let matching_ops: Vec<_> = operations
444                .iter()
445                .filter(|op| self.operation_matches_pattern(&op.operation, &rule.pattern))
446                .cloned()
447                .collect();
448
449            if matching_ops.len() > 1 {
450                conflicts.push(ConflictType::Semantic {
451                    conflicting_ops: matching_ops,
452                    constraint_violation: rule.description.clone(),
453                });
454            }
455        }
456
457        Ok(conflicts)
458    }
459
460    /// Resolve a single conflict
461    async fn resolve_single_conflict(&self, conflict: &ConflictType) -> Result<ResolutionResult> {
462        let strategy = self.select_resolution_strategy(conflict).await;
463
464        match conflict {
465            ConflictType::WriteWrite {
466                operation1,
467                operation2,
468            } => {
469                self.resolve_write_write_conflict(operation1, operation2, &strategy)
470                    .await
471            }
472            ConflictType::DeleteUpdate {
473                delete_op,
474                update_op,
475            } => {
476                self.resolve_delete_update_conflict(delete_op, update_op, &strategy)
477                    .await
478            }
479            ConflictType::Semantic {
480                conflicting_ops,
481                constraint_violation,
482            } => {
483                self.resolve_semantic_conflict(conflicting_ops, constraint_violation, &strategy)
484                    .await
485            }
486            ConflictType::Clear {
487                clear_op,
488                conflicting_ops,
489            } => {
490                self.resolve_clear_conflict(clear_op, conflicting_ops, &strategy)
491                    .await
492            }
493        }
494    }
495
496    /// Select appropriate resolution strategy
497    async fn select_resolution_strategy(&self, conflict: &ConflictType) -> ResolutionStrategy {
498        // Check for strategy overrides first
499        let operations = match conflict {
500            ConflictType::WriteWrite {
501                operation1,
502                operation2,
503            } => vec![operation1, operation2],
504            ConflictType::DeleteUpdate {
505                delete_op,
506                update_op,
507            } => vec![delete_op, update_op],
508            ConflictType::Semantic {
509                conflicting_ops, ..
510            } => conflicting_ops.iter().collect(),
511            ConflictType::Clear {
512                clear_op,
513                conflicting_ops,
514            } => {
515                let mut ops = vec![clear_op];
516                ops.extend(conflicting_ops.iter());
517                ops
518            }
519        };
520
521        // Check strategy overrides
522        for op in &operations {
523            for (pattern, strategy) in &self.strategy_overrides {
524                if self.operation_matches_pattern(&op.operation, pattern) {
525                    return strategy.clone();
526                }
527            }
528        }
529
530        // Use default strategy
531        self.default_strategy.clone()
532    }
533
534    /// Resolve write-write conflict
535    async fn resolve_write_write_conflict(
536        &self,
537        op1: &TimestampedOperation,
538        op2: &TimestampedOperation,
539        strategy: &ResolutionStrategy,
540    ) -> Result<ResolutionResult> {
541        let (resolved, rejected) = match strategy {
542            ResolutionStrategy::LastWriterWins => {
543                if op1.physical_time >= op2.physical_time {
544                    (vec![op1.clone()], vec![op2.clone()])
545                } else {
546                    (vec![op2.clone()], vec![op1.clone()])
547                }
548            }
549            ResolutionStrategy::FirstWriterWins => {
550                if op1.physical_time <= op2.physical_time {
551                    (vec![op1.clone()], vec![op2.clone()])
552                } else {
553                    (vec![op2.clone()], vec![op1.clone()])
554                }
555            }
556            ResolutionStrategy::PriorityBased => {
557                if op1.priority >= op2.priority {
558                    (vec![op1.clone()], vec![op2.clone()])
559                } else {
560                    (vec![op2.clone()], vec![op1.clone()])
561                }
562            }
563            ResolutionStrategy::NodePriority { node_priorities } => {
564                let priority1 = node_priorities.get(&op1.origin_node).unwrap_or(&0);
565                let priority2 = node_priorities.get(&op2.origin_node).unwrap_or(&0);
566
567                if priority1 >= priority2 {
568                    (vec![op1.clone()], vec![op2.clone()])
569                } else {
570                    (vec![op2.clone()], vec![op1.clone()])
571                }
572            }
573            _ => {
574                // Default to last writer wins
575                if op1.physical_time >= op2.physical_time {
576                    (vec![op1.clone()], vec![op2.clone()])
577                } else {
578                    (vec![op2.clone()], vec![op1.clone()])
579                }
580            }
581        };
582
583        Ok(ResolutionResult {
584            conflicting_operations: vec![op1.clone(), op2.clone()],
585            resolved_operations: resolved,
586            rejected_operations: rejected,
587            strategy_used: strategy.clone(),
588            metadata: HashMap::new(),
589        })
590    }
591
592    /// Resolve delete-update conflict
593    async fn resolve_delete_update_conflict(
594        &self,
595        delete_op: &TimestampedOperation,
596        update_op: &TimestampedOperation,
597        strategy: &ResolutionStrategy,
598    ) -> Result<ResolutionResult> {
599        let (resolved, rejected) = match strategy {
600            ResolutionStrategy::LastWriterWins => {
601                if delete_op.physical_time >= update_op.physical_time {
602                    (vec![delete_op.clone()], vec![update_op.clone()])
603                } else {
604                    (vec![update_op.clone()], vec![delete_op.clone()])
605                }
606            }
607            _ => {
608                // Default: delete wins
609                (vec![delete_op.clone()], vec![update_op.clone()])
610            }
611        };
612
613        Ok(ResolutionResult {
614            conflicting_operations: vec![delete_op.clone(), update_op.clone()],
615            resolved_operations: resolved,
616            rejected_operations: rejected,
617            strategy_used: strategy.clone(),
618            metadata: HashMap::new(),
619        })
620    }
621
622    /// Resolve semantic conflict
623    async fn resolve_semantic_conflict(
624        &self,
625        conflicting_ops: &[TimestampedOperation],
626        _constraint_violation: &str,
627        strategy: &ResolutionStrategy,
628    ) -> Result<ResolutionResult> {
629        let (resolved, rejected) = match strategy {
630            ResolutionStrategy::SemanticResolution { resolution_rules } => {
631                // Apply semantic rules
632                let mut resolved = Vec::new();
633                let mut rejected = conflicting_ops.to_vec();
634
635                for rule in resolution_rules {
636                    match &rule.action {
637                        ResolutionAction::AcceptFirst => {
638                            if let Some(first_op) = conflicting_ops.first() {
639                                resolved = vec![first_op.clone()];
640                                rejected = conflicting_ops[1..].to_vec();
641                            }
642                            break;
643                        }
644                        ResolutionAction::AcceptLast => {
645                            if let Some(last_op) = conflicting_ops.last() {
646                                resolved = vec![last_op.clone()];
647                                rejected = conflicting_ops[..conflicting_ops.len() - 1].to_vec();
648                            }
649                            break;
650                        }
651                        ResolutionAction::AcceptHighestPriority => {
652                            if let Some(highest_priority_op) =
653                                conflicting_ops.iter().max_by_key(|op| op.priority)
654                            {
655                                resolved = vec![highest_priority_op.clone()];
656                                rejected = conflicting_ops
657                                    .iter()
658                                    .filter(|op| {
659                                        op.operation_id != highest_priority_op.operation_id
660                                    })
661                                    .cloned()
662                                    .collect();
663                            }
664                            break;
665                        }
666                        ResolutionAction::RejectAll => {
667                            resolved = Vec::new();
668                            rejected = conflicting_ops.to_vec();
669                            break;
670                        }
671                        _ => continue,
672                    }
673                }
674
675                (resolved, rejected)
676            }
677            _ => {
678                // Default: reject all
679                (Vec::new(), conflicting_ops.to_vec())
680            }
681        };
682
683        Ok(ResolutionResult {
684            conflicting_operations: conflicting_ops.to_vec(),
685            resolved_operations: resolved,
686            rejected_operations: rejected,
687            strategy_used: strategy.clone(),
688            metadata: HashMap::new(),
689        })
690    }
691
692    /// Resolve clear conflict
693    async fn resolve_clear_conflict(
694        &self,
695        clear_op: &TimestampedOperation,
696        conflicting_ops: &[TimestampedOperation],
697        _strategy: &ResolutionStrategy,
698    ) -> Result<ResolutionResult> {
699        // Clear operation typically wins
700        Ok(ResolutionResult {
701            conflicting_operations: {
702                let mut ops = vec![clear_op.clone()];
703                ops.extend(conflicting_ops.iter().cloned());
704                ops
705            },
706            resolved_operations: vec![clear_op.clone()],
707            rejected_operations: conflicting_ops.to_vec(),
708            strategy_used: ResolutionStrategy::FirstWriterWins, // Clear always wins
709            metadata: HashMap::new(),
710        })
711    }
712
713    /// Check if operation matches pattern
714    fn operation_matches_pattern(
715        &self,
716        operation: &RdfOperation,
717        pattern: &OperationPattern,
718    ) -> bool {
719        // Check operation type
720        if let Some(expected_type) = &pattern.operation_type {
721            let actual_type = match operation {
722                RdfOperation::Insert { .. } => OperationType::Insert,
723                RdfOperation::Delete { .. } => OperationType::Delete,
724                RdfOperation::Update { .. } => OperationType::Update,
725                RdfOperation::Clear { .. } => OperationType::Clear,
726                RdfOperation::Batch { .. } => return false, // Batch operations don't match simple patterns
727            };
728
729            if &actual_type != expected_type {
730                return false;
731            }
732        }
733
734        // Check triple patterns
735        match operation {
736            RdfOperation::Insert {
737                subject,
738                predicate,
739                object,
740                ..
741            }
742            | RdfOperation::Delete {
743                subject,
744                predicate,
745                object,
746                ..
747            } => self.check_triple_pattern(subject, predicate, object, pattern),
748            RdfOperation::Update {
749                new_triple: (subject, predicate, object),
750                ..
751            } => self.check_triple_pattern(subject, predicate, object, pattern),
752            _ => true, // Clear and batch operations match by default
753        }
754    }
755
756    /// Check if triple matches pattern
757    fn check_triple_pattern(
758        &self,
759        subject: &str,
760        predicate: &str,
761        object: &str,
762        pattern: &OperationPattern,
763    ) -> bool {
764        if let Some(subject_pattern) = &pattern.subject_pattern {
765            if !self.matches_wildcard_pattern(subject, subject_pattern) {
766                return false;
767            }
768        }
769
770        if let Some(predicate_pattern) = &pattern.predicate_pattern {
771            if !self.matches_wildcard_pattern(predicate, predicate_pattern) {
772                return false;
773            }
774        }
775
776        if let Some(object_pattern) = &pattern.object_pattern {
777            if !self.matches_wildcard_pattern(object, object_pattern) {
778                return false;
779            }
780        }
781
782        true
783    }
784
785    /// Check if string matches wildcard pattern
786    fn matches_wildcard_pattern(&self, value: &str, pattern: &str) -> bool {
787        if pattern == "*" {
788            return true;
789        }
790
791        // Simple wildcard matching (can be enhanced)
792        if pattern.contains('*') {
793            let parts: Vec<_> = pattern.split('*').collect();
794            let mut value_pos = 0;
795
796            for (i, part) in parts.iter().enumerate() {
797                if part.is_empty() {
798                    continue;
799                }
800
801                if i == 0 {
802                    // First part must match from the beginning
803                    if !value[value_pos..].starts_with(part) {
804                        return false;
805                    }
806                    value_pos += part.len();
807                } else if i == parts.len() - 1 {
808                    // Last part must match at the end
809                    return value[value_pos..].ends_with(part);
810                } else {
811                    // Middle part must be found somewhere
812                    if let Some(pos) = value[value_pos..].find(part) {
813                        value_pos += pos + part.len();
814                    } else {
815                        return false;
816                    }
817                }
818            }
819
820            true
821        } else {
822            value == pattern
823        }
824    }
825
826    /// Update resolution statistics
827    async fn update_statistics(&self, results: &[ResolutionResult], resolution_time: Duration) {
828        let mut stats = self.resolution_stats.write().await;
829
830        stats.total_conflicts += results.len() as u64;
831
832        // Update average resolution time
833        let total_time = stats.average_resolution_time.as_nanos() * stats.total_conflicts as u128
834            + resolution_time.as_nanos();
835        stats.average_resolution_time = Duration::from_nanos(
836            (total_time / (stats.total_conflicts + results.len() as u64) as u128) as u64,
837        );
838
839        // Update strategy usage
840        for result in results {
841            let strategy_name = format!("{:?}", result.strategy_used);
842            *stats.strategies_used.entry(strategy_name).or_insert(0) += 1;
843        }
844
845        // Calculate success rate
846        let manual_resolutions = results
847            .iter()
848            .filter(|r| matches!(r.strategy_used, ResolutionStrategy::Manual))
849            .count();
850        let total_resolutions = results.len();
851        stats.success_rate = if total_resolutions > 0 {
852            1.0 - (manual_resolutions as f64 / total_resolutions as f64)
853        } else {
854            1.0
855        };
856    }
857
858    /// Get resolution statistics
859    pub async fn get_statistics(&self) -> ResolutionStatistics {
860        self.resolution_stats.read().await.clone()
861    }
862}
863
864impl Default for VectorClock {
865    fn default() -> Self {
866        Self::new()
867    }
868}
869
870#[cfg(test)]
871mod tests {
872    use super::*;
873
874    fn create_test_operation(
875        id: &str,
876        node: OxirsNodeId,
877        operation: RdfOperation,
878    ) -> TimestampedOperation {
879        TimestampedOperation {
880            operation_id: id.to_string(),
881            origin_node: node,
882            vector_clock: VectorClock::new(),
883            physical_time: UNIX_EPOCH,
884            operation,
885            priority: 0,
886        }
887    }
888
889    #[test]
890    fn test_vector_clock_operations() {
891        let mut clock1 = VectorClock::new();
892        let mut clock2 = VectorClock::new();
893
894        clock1.increment(1);
895        clock1.increment(1);
896        clock2.increment(2);
897
898        assert!(!clock1.happens_before(&clock2));
899        assert!(!clock2.happens_before(&clock1));
900        assert!(clock1.is_concurrent(&clock2));
901
902        clock2.update(&clock1);
903        assert!(clock1.happens_before(&clock2));
904    }
905
906    #[tokio::test]
907    async fn test_write_write_conflict_detection() {
908        let resolver = ConflictResolver::new(ResolutionStrategy::LastWriterWins);
909
910        let op1 = create_test_operation(
911            "op1",
912            1,
913            RdfOperation::Insert {
914                subject: "s1".to_string(),
915                predicate: "p1".to_string(),
916                object: "o1".to_string(),
917                graph: None,
918            },
919        );
920
921        let op2 = create_test_operation(
922            "op2",
923            2,
924            RdfOperation::Insert {
925                subject: "s1".to_string(),
926                predicate: "p1".to_string(),
927                object: "o2".to_string(),
928                graph: None,
929            },
930        );
931
932        let conflicts = resolver.detect_conflicts(&[op1, op2]).await.unwrap();
933        assert_eq!(conflicts.len(), 1);
934        assert!(matches!(conflicts[0], ConflictType::WriteWrite { .. }));
935    }
936
937    #[tokio::test]
938    async fn test_conflict_resolution() {
939        let resolver = ConflictResolver::new(ResolutionStrategy::LastWriterWins);
940
941        let mut op1 = create_test_operation(
942            "op1",
943            1,
944            RdfOperation::Insert {
945                subject: "s1".to_string(),
946                predicate: "p1".to_string(),
947                object: "o1".to_string(),
948                graph: None,
949            },
950        );
951        op1.physical_time = UNIX_EPOCH + Duration::from_secs(1);
952
953        let mut op2 = create_test_operation(
954            "op2",
955            2,
956            RdfOperation::Insert {
957                subject: "s1".to_string(),
958                predicate: "p1".to_string(),
959                object: "o2".to_string(),
960                graph: None,
961            },
962        );
963        op2.physical_time = UNIX_EPOCH + Duration::from_secs(2);
964
965        let conflict = ConflictType::WriteWrite {
966            operation1: op1.clone(),
967            operation2: op2.clone(),
968        };
969
970        let results = resolver.resolve_conflicts(&[conflict]).await.unwrap();
971        assert_eq!(results.len(), 1);
972
973        let result = &results[0];
974        assert_eq!(result.resolved_operations.len(), 1);
975        assert_eq!(result.resolved_operations[0].operation_id, "op2"); // Later operation wins
976        assert_eq!(result.rejected_operations.len(), 1);
977        assert_eq!(result.rejected_operations[0].operation_id, "op1");
978    }
979
980    #[test]
981    fn test_wildcard_pattern_matching() {
982        let resolver = ConflictResolver::new(ResolutionStrategy::LastWriterWins);
983
984        assert!(resolver.matches_wildcard_pattern("hello", "*"));
985        assert!(resolver.matches_wildcard_pattern("hello", "hello"));
986        assert!(resolver.matches_wildcard_pattern("hello", "h*o"));
987        assert!(resolver.matches_wildcard_pattern("hello", "*lo"));
988        assert!(resolver.matches_wildcard_pattern("hello", "he*"));
989        assert!(!resolver.matches_wildcard_pattern("hello", "world"));
990        assert!(!resolver.matches_wildcard_pattern("hello", "h*x"));
991    }
992}