1use crate::raft::OxirsNodeId;
7use anyhow::Result;
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, HashMap, HashSet};
10use std::sync::Arc;
11#[cfg(test)]
12use std::time::UNIX_EPOCH;
13use std::time::{Duration, SystemTime};
14use tokio::sync::RwLock;
15
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18pub struct VectorClock {
19 pub clocks: BTreeMap<OxirsNodeId, u64>,
21}
22
23impl VectorClock {
24 pub fn new() -> Self {
26 Self {
27 clocks: BTreeMap::new(),
28 }
29 }
30
31 pub fn increment(&mut self, node_id: OxirsNodeId) {
33 let counter = self.clocks.entry(node_id).or_insert(0);
34 *counter += 1;
35 }
36
37 pub fn update(&mut self, other: &VectorClock) {
39 for (node_id, other_time) in &other.clocks {
40 let my_time = self.clocks.entry(*node_id).or_insert(0);
41 *my_time = (*my_time).max(*other_time);
42 }
43 }
44
45 pub fn happens_before(&self, other: &VectorClock) -> bool {
47 let mut all_less_or_equal = true;
48 let mut at_least_one_less = false;
49
50 let all_nodes: HashSet<_> = self.clocks.keys().chain(other.clocks.keys()).collect();
52
53 for node_id in all_nodes {
54 let my_time = self.clocks.get(node_id).unwrap_or(&0);
55 let other_time = other.clocks.get(node_id).unwrap_or(&0);
56
57 if my_time > other_time {
58 all_less_or_equal = false;
59 break;
60 }
61 if my_time < other_time {
62 at_least_one_less = true;
63 }
64 }
65
66 all_less_or_equal && at_least_one_less
67 }
68
69 pub fn is_concurrent(&self, other: &VectorClock) -> bool {
71 !self.happens_before(other) && !other.happens_before(self) && self != other
72 }
73
74 pub fn merge(&self, other: &VectorClock) -> VectorClock {
76 let mut result = self.clone();
77 result.update(other);
78 result
79 }
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
84pub struct TimestampedOperation {
85 pub operation_id: String,
87 pub origin_node: OxirsNodeId,
89 pub vector_clock: VectorClock,
91 pub physical_time: SystemTime,
93 pub operation: RdfOperation,
95 pub priority: u32,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
101pub enum RdfOperation {
102 Insert {
104 subject: String,
105 predicate: String,
106 object: String,
107 graph: Option<String>,
108 },
109 Delete {
111 subject: String,
112 predicate: String,
113 object: String,
114 graph: Option<String>,
115 },
116 Update {
118 old_triple: (String, String, String),
119 new_triple: (String, String, String),
120 graph: Option<String>,
121 },
122 Clear { graph: Option<String> },
124 Batch { operations: Vec<RdfOperation> },
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
130pub enum ConflictType {
131 WriteWrite {
133 operation1: TimestampedOperation,
134 operation2: TimestampedOperation,
135 },
136 DeleteUpdate {
138 delete_op: TimestampedOperation,
139 update_op: TimestampedOperation,
140 },
141 Semantic {
143 conflicting_ops: Vec<TimestampedOperation>,
144 constraint_violation: String,
145 },
146 Clear {
148 clear_op: TimestampedOperation,
149 conflicting_ops: Vec<TimestampedOperation>,
150 },
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
155pub enum ResolutionStrategy {
156 LastWriterWins,
158 FirstWriterWins,
160 PriorityBased,
162 NodePriority {
164 node_priorities: HashMap<OxirsNodeId, u32>,
165 },
166 SemanticResolution { resolution_rules: Vec<SemanticRule> },
168 Custom { resolver_name: String },
170 Manual,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
176pub struct SemanticRule {
177 pub rule_id: String,
179 pub description: String,
181 pub pattern: OperationPattern,
183 pub action: ResolutionAction,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
189pub struct OperationPattern {
190 pub subject_pattern: Option<String>,
192 pub predicate_pattern: Option<String>,
194 pub object_pattern: Option<String>,
196 pub operation_type: Option<OperationType>,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
202pub enum OperationType {
203 Insert,
204 Delete,
205 Update,
206 Clear,
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
211pub enum ResolutionAction {
212 AcceptFirst,
214 AcceptLast,
216 AcceptHighestPriority,
218 Merge,
220 RejectAll,
222 Custom { action_name: String },
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
228pub struct ResolutionResult {
229 pub conflicting_operations: Vec<TimestampedOperation>,
231 pub resolved_operations: Vec<TimestampedOperation>,
233 pub rejected_operations: Vec<TimestampedOperation>,
235 pub strategy_used: ResolutionStrategy,
237 pub metadata: HashMap<String, String>,
239}
240
241#[derive(Debug)]
243pub struct ConflictResolver {
244 default_strategy: ResolutionStrategy,
246 strategy_overrides: Vec<(OperationPattern, ResolutionStrategy)>,
248 semantic_rules: Vec<SemanticRule>,
250 node_priorities: HashMap<OxirsNodeId, u32>,
252 resolution_stats: Arc<RwLock<ResolutionStatistics>>,
254}
255
256#[derive(Debug, Default, Clone)]
258pub struct ResolutionStatistics {
259 pub total_conflicts: u64,
261 pub conflicts_by_type: HashMap<String, u64>,
263 pub strategies_used: HashMap<String, u64>,
265 pub average_resolution_time: Duration,
267 pub success_rate: f64,
269}
270
271impl ConflictResolver {
272 pub fn new(default_strategy: ResolutionStrategy) -> Self {
274 Self {
275 default_strategy,
276 strategy_overrides: Vec::new(),
277 semantic_rules: Vec::new(),
278 node_priorities: HashMap::new(),
279 resolution_stats: Arc::new(RwLock::new(ResolutionStatistics::default())),
280 }
281 }
282
283 pub fn add_strategy_override(
285 &mut self,
286 pattern: OperationPattern,
287 strategy: ResolutionStrategy,
288 ) {
289 self.strategy_overrides.push((pattern, strategy));
290 }
291
292 pub fn add_semantic_rule(&mut self, rule: SemanticRule) {
294 self.semantic_rules.push(rule);
295 }
296
297 pub fn set_node_priority(&mut self, node_id: OxirsNodeId, priority: u32) {
299 self.node_priorities.insert(node_id, priority);
300 }
301
302 pub async fn detect_conflicts(
304 &self,
305 operations: &[TimestampedOperation],
306 ) -> Result<Vec<ConflictType>> {
307 let mut conflicts = Vec::new();
308
309 for i in 0..operations.len() {
311 for j in (i + 1)..operations.len() {
312 let op1 = &operations[i];
313 let op2 = &operations[j];
314
315 if let Some(conflict) = self.check_operation_conflict(op1, op2).await? {
316 conflicts.push(conflict);
317 }
318 }
319 }
320
321 let semantic_conflicts = self.check_semantic_conflicts(operations).await?;
323 conflicts.extend(semantic_conflicts);
324
325 Ok(conflicts)
326 }
327
328 pub async fn resolve_conflicts(
330 &self,
331 conflicts: &[ConflictType],
332 ) -> Result<Vec<ResolutionResult>> {
333 let start_time = std::time::Instant::now();
334 let mut results = Vec::new();
335
336 for conflict in conflicts {
337 let result = self.resolve_single_conflict(conflict).await?;
338 results.push(result);
339 }
340
341 let resolution_time = start_time.elapsed();
343 self.update_statistics(&results, resolution_time).await;
344
345 Ok(results)
346 }
347
348 async fn check_operation_conflict(
350 &self,
351 op1: &TimestampedOperation,
352 op2: &TimestampedOperation,
353 ) -> Result<Option<ConflictType>> {
354 if op1.vector_clock.happens_before(&op2.vector_clock)
356 || op2.vector_clock.happens_before(&op1.vector_clock)
357 {
358 return Ok(None);
359 }
360
361 match (&op1.operation, &op2.operation) {
362 (
364 RdfOperation::Insert {
365 subject: s1,
366 predicate: p1,
367 object: o1,
368 graph: g1,
369 },
370 RdfOperation::Insert {
371 subject: s2,
372 predicate: p2,
373 object: o2,
374 graph: g2,
375 },
376 ) => {
377 if s1 == s2 && p1 == p2 && o1 != o2 && g1 == g2 {
378 Ok(Some(ConflictType::WriteWrite {
379 operation1: op1.clone(),
380 operation2: op2.clone(),
381 }))
382 } else {
383 Ok(None)
384 }
385 }
386
387 (
389 RdfOperation::Delete {
390 subject: s1,
391 predicate: p1,
392 object: o1,
393 graph: g1,
394 },
395 RdfOperation::Update {
396 old_triple: (s2, p2, o2),
397 graph: g2,
398 ..
399 },
400 ) => {
401 if s1 == s2 && p1 == p2 && o1 == o2 && g1 == g2 {
402 Ok(Some(ConflictType::DeleteUpdate {
403 delete_op: op1.clone(),
404 update_op: op2.clone(),
405 }))
406 } else {
407 Ok(None)
408 }
409 }
410
411 (RdfOperation::Clear { graph: _g1 }, _) | (_, RdfOperation::Clear { graph: _g1 }) => {
413 let clear_op = if matches!(op1.operation, RdfOperation::Clear { .. }) {
414 op1.clone()
415 } else {
416 op2.clone()
417 };
418 let other_op = if matches!(op1.operation, RdfOperation::Clear { .. }) {
419 op2.clone()
420 } else {
421 op1.clone()
422 };
423
424 Ok(Some(ConflictType::Clear {
425 clear_op,
426 conflicting_ops: vec![other_op],
427 }))
428 }
429
430 _ => Ok(None),
431 }
432 }
433
434 async fn check_semantic_conflicts(
436 &self,
437 operations: &[TimestampedOperation],
438 ) -> Result<Vec<ConflictType>> {
439 let mut conflicts = Vec::new();
440
441 for rule in &self.semantic_rules {
443 let matching_ops: Vec<_> = operations
444 .iter()
445 .filter(|op| self.operation_matches_pattern(&op.operation, &rule.pattern))
446 .cloned()
447 .collect();
448
449 if matching_ops.len() > 1 {
450 conflicts.push(ConflictType::Semantic {
451 conflicting_ops: matching_ops,
452 constraint_violation: rule.description.clone(),
453 });
454 }
455 }
456
457 Ok(conflicts)
458 }
459
460 async fn resolve_single_conflict(&self, conflict: &ConflictType) -> Result<ResolutionResult> {
462 let strategy = self.select_resolution_strategy(conflict).await;
463
464 match conflict {
465 ConflictType::WriteWrite {
466 operation1,
467 operation2,
468 } => {
469 self.resolve_write_write_conflict(operation1, operation2, &strategy)
470 .await
471 }
472 ConflictType::DeleteUpdate {
473 delete_op,
474 update_op,
475 } => {
476 self.resolve_delete_update_conflict(delete_op, update_op, &strategy)
477 .await
478 }
479 ConflictType::Semantic {
480 conflicting_ops,
481 constraint_violation,
482 } => {
483 self.resolve_semantic_conflict(conflicting_ops, constraint_violation, &strategy)
484 .await
485 }
486 ConflictType::Clear {
487 clear_op,
488 conflicting_ops,
489 } => {
490 self.resolve_clear_conflict(clear_op, conflicting_ops, &strategy)
491 .await
492 }
493 }
494 }
495
496 async fn select_resolution_strategy(&self, conflict: &ConflictType) -> ResolutionStrategy {
498 let operations = match conflict {
500 ConflictType::WriteWrite {
501 operation1,
502 operation2,
503 } => vec![operation1, operation2],
504 ConflictType::DeleteUpdate {
505 delete_op,
506 update_op,
507 } => vec![delete_op, update_op],
508 ConflictType::Semantic {
509 conflicting_ops, ..
510 } => conflicting_ops.iter().collect(),
511 ConflictType::Clear {
512 clear_op,
513 conflicting_ops,
514 } => {
515 let mut ops = vec![clear_op];
516 ops.extend(conflicting_ops.iter());
517 ops
518 }
519 };
520
521 for op in &operations {
523 for (pattern, strategy) in &self.strategy_overrides {
524 if self.operation_matches_pattern(&op.operation, pattern) {
525 return strategy.clone();
526 }
527 }
528 }
529
530 self.default_strategy.clone()
532 }
533
534 async fn resolve_write_write_conflict(
536 &self,
537 op1: &TimestampedOperation,
538 op2: &TimestampedOperation,
539 strategy: &ResolutionStrategy,
540 ) -> Result<ResolutionResult> {
541 let (resolved, rejected) = match strategy {
542 ResolutionStrategy::LastWriterWins => {
543 if op1.physical_time >= op2.physical_time {
544 (vec![op1.clone()], vec![op2.clone()])
545 } else {
546 (vec![op2.clone()], vec![op1.clone()])
547 }
548 }
549 ResolutionStrategy::FirstWriterWins => {
550 if op1.physical_time <= op2.physical_time {
551 (vec![op1.clone()], vec![op2.clone()])
552 } else {
553 (vec![op2.clone()], vec![op1.clone()])
554 }
555 }
556 ResolutionStrategy::PriorityBased => {
557 if op1.priority >= op2.priority {
558 (vec![op1.clone()], vec![op2.clone()])
559 } else {
560 (vec![op2.clone()], vec![op1.clone()])
561 }
562 }
563 ResolutionStrategy::NodePriority { node_priorities } => {
564 let priority1 = node_priorities.get(&op1.origin_node).unwrap_or(&0);
565 let priority2 = node_priorities.get(&op2.origin_node).unwrap_or(&0);
566
567 if priority1 >= priority2 {
568 (vec![op1.clone()], vec![op2.clone()])
569 } else {
570 (vec![op2.clone()], vec![op1.clone()])
571 }
572 }
573 _ => {
574 if op1.physical_time >= op2.physical_time {
576 (vec![op1.clone()], vec![op2.clone()])
577 } else {
578 (vec![op2.clone()], vec![op1.clone()])
579 }
580 }
581 };
582
583 Ok(ResolutionResult {
584 conflicting_operations: vec![op1.clone(), op2.clone()],
585 resolved_operations: resolved,
586 rejected_operations: rejected,
587 strategy_used: strategy.clone(),
588 metadata: HashMap::new(),
589 })
590 }
591
592 async fn resolve_delete_update_conflict(
594 &self,
595 delete_op: &TimestampedOperation,
596 update_op: &TimestampedOperation,
597 strategy: &ResolutionStrategy,
598 ) -> Result<ResolutionResult> {
599 let (resolved, rejected) = match strategy {
600 ResolutionStrategy::LastWriterWins => {
601 if delete_op.physical_time >= update_op.physical_time {
602 (vec![delete_op.clone()], vec![update_op.clone()])
603 } else {
604 (vec![update_op.clone()], vec![delete_op.clone()])
605 }
606 }
607 _ => {
608 (vec![delete_op.clone()], vec![update_op.clone()])
610 }
611 };
612
613 Ok(ResolutionResult {
614 conflicting_operations: vec![delete_op.clone(), update_op.clone()],
615 resolved_operations: resolved,
616 rejected_operations: rejected,
617 strategy_used: strategy.clone(),
618 metadata: HashMap::new(),
619 })
620 }
621
622 async fn resolve_semantic_conflict(
624 &self,
625 conflicting_ops: &[TimestampedOperation],
626 _constraint_violation: &str,
627 strategy: &ResolutionStrategy,
628 ) -> Result<ResolutionResult> {
629 let (resolved, rejected) = match strategy {
630 ResolutionStrategy::SemanticResolution { resolution_rules } => {
631 let mut resolved = Vec::new();
633 let mut rejected = conflicting_ops.to_vec();
634
635 for rule in resolution_rules {
636 match &rule.action {
637 ResolutionAction::AcceptFirst => {
638 if let Some(first_op) = conflicting_ops.first() {
639 resolved = vec![first_op.clone()];
640 rejected = conflicting_ops[1..].to_vec();
641 }
642 break;
643 }
644 ResolutionAction::AcceptLast => {
645 if let Some(last_op) = conflicting_ops.last() {
646 resolved = vec![last_op.clone()];
647 rejected = conflicting_ops[..conflicting_ops.len() - 1].to_vec();
648 }
649 break;
650 }
651 ResolutionAction::AcceptHighestPriority => {
652 if let Some(highest_priority_op) =
653 conflicting_ops.iter().max_by_key(|op| op.priority)
654 {
655 resolved = vec![highest_priority_op.clone()];
656 rejected = conflicting_ops
657 .iter()
658 .filter(|op| {
659 op.operation_id != highest_priority_op.operation_id
660 })
661 .cloned()
662 .collect();
663 }
664 break;
665 }
666 ResolutionAction::RejectAll => {
667 resolved = Vec::new();
668 rejected = conflicting_ops.to_vec();
669 break;
670 }
671 _ => continue,
672 }
673 }
674
675 (resolved, rejected)
676 }
677 _ => {
678 (Vec::new(), conflicting_ops.to_vec())
680 }
681 };
682
683 Ok(ResolutionResult {
684 conflicting_operations: conflicting_ops.to_vec(),
685 resolved_operations: resolved,
686 rejected_operations: rejected,
687 strategy_used: strategy.clone(),
688 metadata: HashMap::new(),
689 })
690 }
691
692 async fn resolve_clear_conflict(
694 &self,
695 clear_op: &TimestampedOperation,
696 conflicting_ops: &[TimestampedOperation],
697 _strategy: &ResolutionStrategy,
698 ) -> Result<ResolutionResult> {
699 Ok(ResolutionResult {
701 conflicting_operations: {
702 let mut ops = vec![clear_op.clone()];
703 ops.extend(conflicting_ops.iter().cloned());
704 ops
705 },
706 resolved_operations: vec![clear_op.clone()],
707 rejected_operations: conflicting_ops.to_vec(),
708 strategy_used: ResolutionStrategy::FirstWriterWins, metadata: HashMap::new(),
710 })
711 }
712
713 fn operation_matches_pattern(
715 &self,
716 operation: &RdfOperation,
717 pattern: &OperationPattern,
718 ) -> bool {
719 if let Some(expected_type) = &pattern.operation_type {
721 let actual_type = match operation {
722 RdfOperation::Insert { .. } => OperationType::Insert,
723 RdfOperation::Delete { .. } => OperationType::Delete,
724 RdfOperation::Update { .. } => OperationType::Update,
725 RdfOperation::Clear { .. } => OperationType::Clear,
726 RdfOperation::Batch { .. } => return false, };
728
729 if &actual_type != expected_type {
730 return false;
731 }
732 }
733
734 match operation {
736 RdfOperation::Insert {
737 subject,
738 predicate,
739 object,
740 ..
741 }
742 | RdfOperation::Delete {
743 subject,
744 predicate,
745 object,
746 ..
747 } => self.check_triple_pattern(subject, predicate, object, pattern),
748 RdfOperation::Update {
749 new_triple: (subject, predicate, object),
750 ..
751 } => self.check_triple_pattern(subject, predicate, object, pattern),
752 _ => true, }
754 }
755
756 fn check_triple_pattern(
758 &self,
759 subject: &str,
760 predicate: &str,
761 object: &str,
762 pattern: &OperationPattern,
763 ) -> bool {
764 if let Some(subject_pattern) = &pattern.subject_pattern {
765 if !self.matches_wildcard_pattern(subject, subject_pattern) {
766 return false;
767 }
768 }
769
770 if let Some(predicate_pattern) = &pattern.predicate_pattern {
771 if !self.matches_wildcard_pattern(predicate, predicate_pattern) {
772 return false;
773 }
774 }
775
776 if let Some(object_pattern) = &pattern.object_pattern {
777 if !self.matches_wildcard_pattern(object, object_pattern) {
778 return false;
779 }
780 }
781
782 true
783 }
784
785 fn matches_wildcard_pattern(&self, value: &str, pattern: &str) -> bool {
787 if pattern == "*" {
788 return true;
789 }
790
791 if pattern.contains('*') {
793 let parts: Vec<_> = pattern.split('*').collect();
794 let mut value_pos = 0;
795
796 for (i, part) in parts.iter().enumerate() {
797 if part.is_empty() {
798 continue;
799 }
800
801 if i == 0 {
802 if !value[value_pos..].starts_with(part) {
804 return false;
805 }
806 value_pos += part.len();
807 } else if i == parts.len() - 1 {
808 return value[value_pos..].ends_with(part);
810 } else {
811 if let Some(pos) = value[value_pos..].find(part) {
813 value_pos += pos + part.len();
814 } else {
815 return false;
816 }
817 }
818 }
819
820 true
821 } else {
822 value == pattern
823 }
824 }
825
826 async fn update_statistics(&self, results: &[ResolutionResult], resolution_time: Duration) {
828 let mut stats = self.resolution_stats.write().await;
829
830 stats.total_conflicts += results.len() as u64;
831
832 let total_time = stats.average_resolution_time.as_nanos() * stats.total_conflicts as u128
834 + resolution_time.as_nanos();
835 stats.average_resolution_time = Duration::from_nanos(
836 (total_time / (stats.total_conflicts + results.len() as u64) as u128) as u64,
837 );
838
839 for result in results {
841 let strategy_name = format!("{:?}", result.strategy_used);
842 *stats.strategies_used.entry(strategy_name).or_insert(0) += 1;
843 }
844
845 let manual_resolutions = results
847 .iter()
848 .filter(|r| matches!(r.strategy_used, ResolutionStrategy::Manual))
849 .count();
850 let total_resolutions = results.len();
851 stats.success_rate = if total_resolutions > 0 {
852 1.0 - (manual_resolutions as f64 / total_resolutions as f64)
853 } else {
854 1.0
855 };
856 }
857
858 pub async fn get_statistics(&self) -> ResolutionStatistics {
860 self.resolution_stats.read().await.clone()
861 }
862}
863
864impl Default for VectorClock {
865 fn default() -> Self {
866 Self::new()
867 }
868}
869
870#[cfg(test)]
871mod tests {
872 use super::*;
873
874 fn create_test_operation(
875 id: &str,
876 node: OxirsNodeId,
877 operation: RdfOperation,
878 ) -> TimestampedOperation {
879 TimestampedOperation {
880 operation_id: id.to_string(),
881 origin_node: node,
882 vector_clock: VectorClock::new(),
883 physical_time: UNIX_EPOCH,
884 operation,
885 priority: 0,
886 }
887 }
888
889 #[test]
890 fn test_vector_clock_operations() {
891 let mut clock1 = VectorClock::new();
892 let mut clock2 = VectorClock::new();
893
894 clock1.increment(1);
895 clock1.increment(1);
896 clock2.increment(2);
897
898 assert!(!clock1.happens_before(&clock2));
899 assert!(!clock2.happens_before(&clock1));
900 assert!(clock1.is_concurrent(&clock2));
901
902 clock2.update(&clock1);
903 assert!(clock1.happens_before(&clock2));
904 }
905
906 #[tokio::test]
907 async fn test_write_write_conflict_detection() {
908 let resolver = ConflictResolver::new(ResolutionStrategy::LastWriterWins);
909
910 let op1 = create_test_operation(
911 "op1",
912 1,
913 RdfOperation::Insert {
914 subject: "s1".to_string(),
915 predicate: "p1".to_string(),
916 object: "o1".to_string(),
917 graph: None,
918 },
919 );
920
921 let op2 = create_test_operation(
922 "op2",
923 2,
924 RdfOperation::Insert {
925 subject: "s1".to_string(),
926 predicate: "p1".to_string(),
927 object: "o2".to_string(),
928 graph: None,
929 },
930 );
931
932 let conflicts = resolver.detect_conflicts(&[op1, op2]).await.unwrap();
933 assert_eq!(conflicts.len(), 1);
934 assert!(matches!(conflicts[0], ConflictType::WriteWrite { .. }));
935 }
936
937 #[tokio::test]
938 async fn test_conflict_resolution() {
939 let resolver = ConflictResolver::new(ResolutionStrategy::LastWriterWins);
940
941 let mut op1 = create_test_operation(
942 "op1",
943 1,
944 RdfOperation::Insert {
945 subject: "s1".to_string(),
946 predicate: "p1".to_string(),
947 object: "o1".to_string(),
948 graph: None,
949 },
950 );
951 op1.physical_time = UNIX_EPOCH + Duration::from_secs(1);
952
953 let mut op2 = create_test_operation(
954 "op2",
955 2,
956 RdfOperation::Insert {
957 subject: "s1".to_string(),
958 predicate: "p1".to_string(),
959 object: "o2".to_string(),
960 graph: None,
961 },
962 );
963 op2.physical_time = UNIX_EPOCH + Duration::from_secs(2);
964
965 let conflict = ConflictType::WriteWrite {
966 operation1: op1.clone(),
967 operation2: op2.clone(),
968 };
969
970 let results = resolver.resolve_conflicts(&[conflict]).await.unwrap();
971 assert_eq!(results.len(), 1);
972
973 let result = &results[0];
974 assert_eq!(result.resolved_operations.len(), 1);
975 assert_eq!(result.resolved_operations[0].operation_id, "op2"); assert_eq!(result.rejected_operations.len(), 1);
977 assert_eq!(result.rejected_operations[0].operation_id, "op1");
978 }
979
980 #[test]
981 fn test_wildcard_pattern_matching() {
982 let resolver = ConflictResolver::new(ResolutionStrategy::LastWriterWins);
983
984 assert!(resolver.matches_wildcard_pattern("hello", "*"));
985 assert!(resolver.matches_wildcard_pattern("hello", "hello"));
986 assert!(resolver.matches_wildcard_pattern("hello", "h*o"));
987 assert!(resolver.matches_wildcard_pattern("hello", "*lo"));
988 assert!(resolver.matches_wildcard_pattern("hello", "he*"));
989 assert!(!resolver.matches_wildcard_pattern("hello", "world"));
990 assert!(!resolver.matches_wildcard_pattern("hello", "h*x"));
991 }
992}