Skip to main content

oxirs_core/distributed/
crdt.rs

1//! CRDTs for conflict-free replicated RDF
2//!
3//! This module implements Conflict-free Replicated Data Types (CRDTs) optimized
4//! for RDF data, enabling eventual consistency without coordination.
5
6#![allow(dead_code)]
7
8use crate::model::{Triple, TriplePattern};
9use crate::OxirsError;
10use scirs2_core::random::{Random, RngExt};
11use serde::{Deserialize, Serialize};
12use std::collections::{BTreeMap, BTreeSet, HashMap};
13use std::sync::Arc;
14use tokio::sync::RwLock;
15
16/// CRDT configuration
17#[derive(Debug, Clone)]
18pub struct CrdtConfig {
19    /// Node ID for this replica
20    pub node_id: String,
21    /// CRDT type to use
22    pub crdt_type: CrdtType,
23    /// Garbage collection configuration
24    pub gc_config: GcConfig,
25    /// Delta sync configuration
26    pub delta_config: DeltaConfig,
27}
28
29/// CRDT types available
30#[derive(Debug, Clone)]
31pub enum CrdtType {
32    /// Grow-only set (2P-Set without removals)
33    GSet,
34    /// Two-phase set (add and remove)
35    TwoPhaseSet,
36    /// Add-remove partial order
37    AddRemovePartialOrder,
38    /// Observed-remove set
39    OrSet,
40    /// Last-write-wins element set
41    LwwSet,
42    /// Multi-value register for conflicts
43    MvRegister,
44    /// RDF-specific CRDT
45    RdfCrdt,
46}
47
48/// Garbage collection configuration
49#[derive(Debug, Clone)]
50pub struct GcConfig {
51    /// Enable automatic GC
52    pub auto_gc: bool,
53    /// GC interval in seconds
54    pub interval_secs: u64,
55    /// Maximum tombstone age in seconds
56    pub tombstone_ttl_secs: u64,
57    /// Batch size for GC
58    pub batch_size: usize,
59}
60
61impl Default for GcConfig {
62    fn default() -> Self {
63        GcConfig {
64            auto_gc: true,
65            interval_secs: 3600,           // 1 hour
66            tombstone_ttl_secs: 86400 * 7, // 1 week
67            batch_size: 1000,
68        }
69    }
70}
71
72/// Delta sync configuration
73#[derive(Debug, Clone)]
74pub struct DeltaConfig {
75    /// Enable delta synchronization
76    pub enabled: bool,
77    /// Maximum delta size before full sync
78    pub max_delta_size: usize,
79    /// Delta buffer size
80    pub buffer_size: usize,
81    /// Compression for deltas
82    pub compression: bool,
83}
84
85impl Default for DeltaConfig {
86    fn default() -> Self {
87        DeltaConfig {
88            enabled: true,
89            max_delta_size: 10000,
90            buffer_size: 100000,
91            compression: true,
92        }
93    }
94}
95
96/// Base trait for CRDTs
97pub trait Crdt: Send + Sync {
98    /// Type of delta for this CRDT
99    type Delta: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>;
100
101    /// Merge with another CRDT state
102    fn merge(&mut self, other: &Self);
103
104    /// Get delta since last checkpoint
105    fn delta(&self) -> Option<Self::Delta>;
106
107    /// Apply delta
108    fn apply_delta(&mut self, delta: Self::Delta);
109
110    /// Reset delta tracking
111    fn reset_delta(&mut self);
112}
113
114/// Unique ID for CRDT elements
115#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
116pub struct ElementId {
117    /// Timestamp (Lamport clock)
118    pub timestamp: u64,
119    /// Node ID
120    pub node_id: String,
121    /// Random component for uniqueness
122    pub random: u64,
123}
124
125impl ElementId {
126    /// Create new element ID
127    pub fn new(timestamp: u64, node_id: String) -> Self {
128        ElementId {
129            timestamp,
130            node_id,
131            random: {
132                let mut rng = Random::default();
133                rng.random::<u64>()
134            },
135        }
136    }
137}
138
139/// Grow-only set CRDT
140#[derive(Debug, Clone)]
141pub struct GrowSet<T: Clone + Ord + Send + Sync> {
142    /// Elements in the set
143    elements: BTreeSet<T>,
144    /// Delta tracking
145    delta_elements: Option<BTreeSet<T>>,
146}
147
148impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> Default for GrowSet<T> {
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> GrowSet<T> {
155    /// Create new grow-only set
156    pub fn new() -> Self {
157        GrowSet {
158            elements: BTreeSet::new(),
159            delta_elements: Some(BTreeSet::new()),
160        }
161    }
162
163    /// Add element
164    pub fn add(&mut self, element: T) {
165        if self.elements.insert(element.clone()) {
166            if let Some(ref mut delta) = self.delta_elements {
167                delta.insert(element);
168            }
169        }
170    }
171
172    /// Check if contains element
173    pub fn contains(&self, element: &T) -> bool {
174        self.elements.contains(element)
175    }
176
177    /// Get all elements
178    pub fn elements(&self) -> &BTreeSet<T> {
179        &self.elements
180    }
181}
182
183impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> Crdt for GrowSet<T> {
184    type Delta = BTreeSet<T>;
185
186    fn merge(&mut self, other: &Self) {
187        for element in &other.elements {
188            self.add(element.clone());
189        }
190    }
191
192    fn delta(&self) -> Option<Self::Delta> {
193        self.delta_elements.clone()
194    }
195
196    fn apply_delta(&mut self, delta: Self::Delta) {
197        for element in delta {
198            self.elements.insert(element);
199        }
200    }
201
202    fn reset_delta(&mut self) {
203        self.delta_elements = Some(BTreeSet::new());
204    }
205}
206
207/// Two-phase set CRDT
208#[derive(Debug, Clone)]
209pub struct TwoPhaseSet<T: Clone + Ord + Send + Sync> {
210    /// Added elements
211    added: BTreeSet<T>,
212    /// Removed elements (tombstones)
213    removed: BTreeSet<T>,
214    /// Delta tracking
215    delta_added: Option<BTreeSet<T>>,
216    delta_removed: Option<BTreeSet<T>>,
217}
218
219impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> Default
220    for TwoPhaseSet<T>
221{
222    fn default() -> Self {
223        Self::new()
224    }
225}
226
227impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> TwoPhaseSet<T> {
228    /// Create new two-phase set
229    pub fn new() -> Self {
230        TwoPhaseSet {
231            added: BTreeSet::new(),
232            removed: BTreeSet::new(),
233            delta_added: Some(BTreeSet::new()),
234            delta_removed: Some(BTreeSet::new()),
235        }
236    }
237
238    /// Add element
239    pub fn add(&mut self, element: T) {
240        if !self.removed.contains(&element) && self.added.insert(element.clone()) {
241            if let Some(ref mut delta) = self.delta_added {
242                delta.insert(element);
243            }
244        }
245    }
246
247    /// Remove element
248    pub fn remove(&mut self, element: T) {
249        if self.added.contains(&element) && self.removed.insert(element.clone()) {
250            if let Some(ref mut delta) = self.delta_removed {
251                delta.insert(element);
252            }
253        }
254    }
255
256    /// Check if contains element
257    pub fn contains(&self, element: &T) -> bool {
258        self.added.contains(element) && !self.removed.contains(element)
259    }
260
261    /// Get current elements
262    pub fn elements(&self) -> BTreeSet<T> {
263        self.added.difference(&self.removed).cloned().collect()
264    }
265}
266
267/// Observed-Remove Set (OR-Set) CRDT
268#[derive(Debug, Clone)]
269pub struct OrSet<T: Clone + Ord + Send + Sync> {
270    /// Elements with their unique tags
271    elements: BTreeMap<T, BTreeSet<ElementId>>,
272    /// Tombstones for removed elements
273    tombstones: BTreeMap<T, BTreeSet<ElementId>>,
274    /// Node ID for generating tags
275    node_id: String,
276    /// Lamport clock
277    clock: u64,
278    /// Delta tracking
279    delta: Option<OrSetDelta<T>>,
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct OrSetDelta<T: Clone + Ord> {
284    /// Added elements with tags
285    added: BTreeMap<T, BTreeSet<ElementId>>,
286    /// Removed tombstones
287    removed: BTreeMap<T, BTreeSet<ElementId>>,
288}
289
290impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> OrSet<T> {
291    /// Create new OR-Set
292    pub fn new(node_id: String) -> Self {
293        OrSet {
294            elements: BTreeMap::new(),
295            tombstones: BTreeMap::new(),
296            node_id,
297            clock: 0,
298            delta: Some(OrSetDelta {
299                added: BTreeMap::new(),
300                removed: BTreeMap::new(),
301            }),
302        }
303    }
304
305    /// Add element
306    pub fn add(&mut self, element: T) {
307        self.clock += 1;
308        let tag = ElementId::new(self.clock, self.node_id.clone());
309
310        self.elements
311            .entry(element.clone())
312            .or_default()
313            .insert(tag.clone());
314
315        if let Some(ref mut delta) = self.delta {
316            delta
317                .added
318                .entry(element)
319                .or_insert_with(BTreeSet::new)
320                .insert(tag);
321        }
322    }
323
324    /// Remove element
325    pub fn remove(&mut self, element: &T) {
326        if let Some(tags) = self.elements.get(element).cloned() {
327            self.tombstones.insert(element.clone(), tags.clone());
328            self.elements.remove(element);
329
330            if let Some(ref mut delta) = self.delta {
331                delta.removed.insert(element.clone(), tags);
332            }
333        }
334    }
335
336    /// Check if contains element
337    pub fn contains(&self, element: &T) -> bool {
338        if let Some(tags) = self.elements.get(element) {
339            if let Some(tombstone_tags) = self.tombstones.get(element) {
340                // Element exists if it has tags not in tombstones
341                !tags.is_subset(tombstone_tags)
342            } else {
343                true
344            }
345        } else {
346            false
347        }
348    }
349
350    /// Get current elements
351    pub fn elements(&self) -> BTreeSet<T> {
352        self.elements
353            .keys()
354            .filter(|e| self.contains(e))
355            .cloned()
356            .collect()
357    }
358}
359
360impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> Crdt for OrSet<T> {
361    type Delta = OrSetDelta<T>;
362
363    fn merge(&mut self, other: &Self) {
364        // Merge elements
365        for (element, tags) in &other.elements {
366            self.elements
367                .entry(element.clone())
368                .or_default()
369                .extend(tags.iter().cloned());
370        }
371
372        // Merge tombstones
373        for (element, tags) in &other.tombstones {
374            self.tombstones
375                .entry(element.clone())
376                .or_default()
377                .extend(tags.iter().cloned());
378        }
379
380        // Remove elements that are fully tombstoned
381        let to_remove: Vec<_> = self
382            .elements
383            .iter()
384            .filter(|(e, tags)| {
385                if let Some(tombstone_tags) = self.tombstones.get(e) {
386                    tags.is_subset(tombstone_tags)
387                } else {
388                    false
389                }
390            })
391            .map(|(e, _)| e.clone())
392            .collect();
393
394        for element in to_remove {
395            self.elements.remove(&element);
396        }
397
398        // Update clock
399        self.clock = self.clock.max(other.clock);
400    }
401
402    fn delta(&self) -> Option<Self::Delta> {
403        self.delta.clone()
404    }
405
406    fn apply_delta(&mut self, delta: Self::Delta) {
407        // Apply added elements
408        for (element, tags) in delta.added {
409            self.elements.entry(element).or_default().extend(tags);
410        }
411
412        // Apply removed tombstones
413        for (element, tags) in delta.removed {
414            self.tombstones
415                .entry(element.clone())
416                .or_default()
417                .extend(tags);
418
419            // Remove element if fully tombstoned
420            if let Some(elem_tags) = self.elements.get(&element) {
421                if let Some(tombstone_tags) = self.tombstones.get(&element) {
422                    if elem_tags.is_subset(tombstone_tags) {
423                        self.elements.remove(&element);
424                    }
425                }
426            }
427        }
428    }
429
430    fn reset_delta(&mut self) {
431        self.delta = Some(OrSetDelta {
432            added: BTreeMap::new(),
433            removed: BTreeMap::new(),
434        });
435    }
436}
437
438/// RDF-specific CRDT optimized for triple stores
439pub struct RdfCrdt {
440    /// Configuration
441    config: CrdtConfig,
442    /// Triple OR-Set for conflict-free triple management
443    triples: OrSet<Triple>,
444    /// Predicate-indexed sets for efficient queries
445    predicate_index: HashMap<String, OrSet<Triple>>,
446    /// Subject-indexed sets
447    subject_index: HashMap<String, OrSet<Triple>>,
448    /// Statistics
449    stats: Arc<RwLock<CrdtStats>>,
450}
451
452/// CRDT statistics
453#[derive(Debug, Default)]
454struct CrdtStats {
455    /// Total operations
456    total_ops: u64,
457    /// Add operations
458    add_ops: u64,
459    /// Remove operations
460    remove_ops: u64,
461    /// Merge operations
462    merge_ops: u64,
463    /// Current triple count
464    triple_count: usize,
465    /// Tombstone count
466    #[allow(dead_code)]
467    tombstone_count: usize,
468}
469
470impl RdfCrdt {
471    /// Create new RDF CRDT
472    pub async fn new(config: CrdtConfig) -> Result<Self, OxirsError> {
473        let node_id = config.node_id.clone();
474
475        Ok(RdfCrdt {
476            config,
477            triples: OrSet::new(node_id),
478            predicate_index: HashMap::new(),
479            subject_index: HashMap::new(),
480            stats: Arc::new(RwLock::new(CrdtStats::default())),
481        })
482    }
483
484    /// Add triple
485    pub async fn add_triple(&mut self, triple: Triple) -> Result<(), OxirsError> {
486        // Add to main set
487        self.triples.add(triple.clone());
488
489        // Update predicate index
490        let predicate_str = match triple.predicate() {
491            crate::model::Predicate::NamedNode(nn) => nn.as_str(),
492            crate::model::Predicate::Variable(v) => v.as_str(),
493        };
494        self.predicate_index
495            .entry(predicate_str.to_string())
496            .or_insert_with(|| OrSet::new(self.config.node_id.clone()))
497            .add(triple.clone());
498
499        // Update subject index
500        let subject_str = match triple.subject() {
501            crate::model::Subject::NamedNode(nn) => nn.as_str(),
502            crate::model::Subject::BlankNode(bn) => bn.as_str(),
503            crate::model::Subject::Variable(v) => v.as_str(),
504            crate::model::Subject::QuotedTriple(_qt) => "<<quoted-triple>>",
505        };
506        self.subject_index
507            .entry(subject_str.to_string())
508            .or_insert_with(|| OrSet::new(self.config.node_id.clone()))
509            .add(triple);
510
511        // Update stats
512        let mut stats = self.stats.write().await;
513        stats.total_ops += 1;
514        stats.add_ops += 1;
515        stats.triple_count = self.triples.elements().len();
516
517        Ok(())
518    }
519
520    /// Remove triple
521    pub async fn remove_triple(&mut self, triple: &Triple) -> Result<(), OxirsError> {
522        // Remove from main set
523        self.triples.remove(triple);
524
525        // Update predicate index
526        let predicate_str = match triple.predicate() {
527            crate::model::Predicate::NamedNode(nn) => nn.as_str(),
528            crate::model::Predicate::Variable(v) => v.as_str(),
529        };
530        if let Some(predicate_set) = self.predicate_index.get_mut(predicate_str) {
531            predicate_set.remove(triple);
532        }
533
534        // Update subject index
535        let subject_str = match triple.subject() {
536            crate::model::Subject::NamedNode(nn) => nn.as_str(),
537            crate::model::Subject::BlankNode(bn) => bn.as_str(),
538            crate::model::Subject::Variable(v) => v.as_str(),
539            crate::model::Subject::QuotedTriple(_qt) => "<<quoted-triple>>",
540        };
541        if let Some(subject_set) = self.subject_index.get_mut(subject_str) {
542            subject_set.remove(triple);
543        }
544
545        // Update stats
546        let mut stats = self.stats.write().await;
547        stats.total_ops += 1;
548        stats.remove_ops += 1;
549        stats.triple_count = self.triples.elements().len();
550
551        Ok(())
552    }
553
554    /// Query triples by pattern
555    pub async fn query(&self, pattern: &TriplePattern) -> Result<Vec<Triple>, OxirsError> {
556        let results = match (pattern.subject(), pattern.predicate(), pattern.object()) {
557            (Some(subject), Some(_predicate), _) => {
558                // Use both subject and predicate index
559                if let Some(subject_set) = self.subject_index.get(subject.as_str()) {
560                    subject_set
561                        .elements()
562                        .into_iter()
563                        .filter(|t| pattern.matches(t))
564                        .collect()
565                } else {
566                    Vec::new()
567                }
568            }
569            (Some(subject), None, _) => {
570                // Use subject index
571                if let Some(subject_set) = self.subject_index.get(subject.as_str()) {
572                    subject_set
573                        .elements()
574                        .into_iter()
575                        .filter(|t| pattern.matches(t))
576                        .collect()
577                } else {
578                    Vec::new()
579                }
580            }
581            (None, Some(predicate), _) => {
582                // Use predicate index
583                if let Some(predicate_set) = self.predicate_index.get(predicate.as_str()) {
584                    predicate_set
585                        .elements()
586                        .into_iter()
587                        .filter(|t| pattern.matches(t))
588                        .collect()
589                } else {
590                    Vec::new()
591                }
592            }
593            _ => {
594                // Full scan
595                self.triples
596                    .elements()
597                    .into_iter()
598                    .filter(|t| pattern.matches(t))
599                    .collect()
600            }
601        };
602
603        Ok(results)
604    }
605
606    /// Merge with another RDF CRDT
607    pub async fn merge(&mut self, other: &RdfCrdt) -> Result<(), OxirsError> {
608        // Merge main triple set
609        self.triples.merge(&other.triples);
610
611        // Merge predicate indexes
612        for (predicate, other_set) in &other.predicate_index {
613            self.predicate_index
614                .entry(predicate.clone())
615                .or_insert_with(|| OrSet::new(self.config.node_id.clone()))
616                .merge(other_set);
617        }
618
619        // Merge subject indexes
620        for (subject, other_set) in &other.subject_index {
621            self.subject_index
622                .entry(subject.clone())
623                .or_insert_with(|| OrSet::new(self.config.node_id.clone()))
624                .merge(other_set);
625        }
626
627        // Update stats
628        let mut stats = self.stats.write().await;
629        stats.merge_ops += 1;
630        stats.triple_count = self.triples.elements().len();
631
632        Ok(())
633    }
634
635    /// Get delta for synchronization
636    pub fn get_delta(&self) -> RdfCrdtDelta {
637        RdfCrdtDelta {
638            triples_delta: self.triples.delta(),
639            predicate_deltas: self
640                .predicate_index
641                .iter()
642                .filter_map(|(p, set)| set.delta().map(|d| (p.clone(), d)))
643                .collect(),
644            subject_deltas: self
645                .subject_index
646                .iter()
647                .filter_map(|(s, set)| set.delta().map(|d| (s.clone(), d)))
648                .collect(),
649        }
650    }
651
652    /// Apply delta from another replica
653    pub async fn apply_delta(&mut self, delta: RdfCrdtDelta) -> Result<(), OxirsError> {
654        // Apply main triple delta
655        if let Some(triples_delta) = delta.triples_delta {
656            self.triples.apply_delta(triples_delta);
657        }
658
659        // Apply predicate deltas
660        for (predicate, pred_delta) in delta.predicate_deltas {
661            self.predicate_index
662                .entry(predicate)
663                .or_insert_with(|| OrSet::new(self.config.node_id.clone()))
664                .apply_delta(pred_delta);
665        }
666
667        // Apply subject deltas
668        for (subject, subj_delta) in delta.subject_deltas {
669            self.subject_index
670                .entry(subject)
671                .or_insert_with(|| OrSet::new(self.config.node_id.clone()))
672                .apply_delta(subj_delta);
673        }
674
675        // Update stats
676        let mut stats = self.stats.write().await;
677        stats.triple_count = self.triples.elements().len();
678
679        Ok(())
680    }
681
682    /// Reset delta tracking
683    pub fn reset_delta(&mut self) {
684        self.triples.reset_delta();
685        for set in self.predicate_index.values_mut() {
686            set.reset_delta();
687        }
688        for set in self.subject_index.values_mut() {
689            set.reset_delta();
690        }
691    }
692
693    /// Garbage collect tombstones
694    pub async fn garbage_collect(&mut self) -> Result<GcReport, OxirsError> {
695        let start_tombstones = self.triples.tombstones.len();
696
697        // Remove old tombstones based on age
698        let cutoff = std::time::SystemTime::now()
699            .duration_since(std::time::UNIX_EPOCH)
700            .expect("system clock should be after Unix epoch")
701            .as_secs()
702            - self.config.gc_config.tombstone_ttl_secs;
703
704        // Filter tombstones by age
705        self.triples
706            .tombstones
707            .retain(|_, tags| tags.iter().any(|tag| tag.timestamp > cutoff));
708
709        // Same for indexes
710        for set in self.predicate_index.values_mut() {
711            set.tombstones
712                .retain(|_, tags| tags.iter().any(|tag| tag.timestamp > cutoff));
713        }
714
715        for set in self.subject_index.values_mut() {
716            set.tombstones
717                .retain(|_, tags| tags.iter().any(|tag| tag.timestamp > cutoff));
718        }
719
720        let removed = start_tombstones - self.triples.tombstones.len();
721
722        Ok(GcReport {
723            tombstones_removed: removed,
724            space_reclaimed: removed * std::mem::size_of::<(Triple, BTreeSet<ElementId>)>(),
725        })
726    }
727
728    /// Get statistics
729    pub async fn stats(&self) -> CrdtStatsReport {
730        let stats = self.stats.read().await;
731        CrdtStatsReport {
732            total_ops: stats.total_ops,
733            add_ops: stats.add_ops,
734            remove_ops: stats.remove_ops,
735            merge_ops: stats.merge_ops,
736            triple_count: stats.triple_count,
737            tombstone_count: self.triples.tombstones.len(),
738        }
739    }
740}
741
742/// RDF CRDT delta for efficient synchronization
743#[derive(Debug, Clone, Serialize, Deserialize)]
744pub struct RdfCrdtDelta {
745    /// Main triple set delta
746    pub triples_delta: Option<OrSetDelta<Triple>>,
747    /// Predicate index deltas
748    pub predicate_deltas: HashMap<String, OrSetDelta<Triple>>,
749    /// Subject index deltas
750    pub subject_deltas: HashMap<String, OrSetDelta<Triple>>,
751}
752
753/// Garbage collection report
754#[derive(Debug)]
755pub struct GcReport {
756    pub tombstones_removed: usize,
757    pub space_reclaimed: usize,
758}
759
760/// CRDT statistics report
761#[derive(Debug)]
762pub struct CrdtStatsReport {
763    pub total_ops: u64,
764    pub add_ops: u64,
765    pub remove_ops: u64,
766    pub merge_ops: u64,
767    pub triple_count: usize,
768    pub tombstone_count: usize,
769}
770
771#[cfg(test)]
772mod tests {
773    use super::*;
774    use crate::model::{Literal, NamedNode, Object};
775
776    #[tokio::test]
777    async fn test_grow_set() {
778        let mut set1 = GrowSet::new();
779        let mut set2 = GrowSet::new();
780
781        set1.add(1);
782        set1.add(2);
783        set2.add(2);
784        set2.add(3);
785
786        set1.merge(&set2);
787
788        assert!(set1.contains(&1));
789        assert!(set1.contains(&2));
790        assert!(set1.contains(&3));
791        assert_eq!(set1.elements().len(), 3);
792    }
793
794    #[tokio::test]
795    async fn test_or_set() {
796        let mut set1 = OrSet::new("node1".to_string());
797        let mut set2 = OrSet::new("node2".to_string());
798
799        set1.add(1);
800        set1.add(2);
801        set2.add(2);
802        set2.add(3);
803
804        // Remove from set1
805        set1.remove(&2);
806
807        // Merge
808        set1.merge(&set2);
809
810        assert!(set1.contains(&1));
811        assert!(set1.contains(&2)); // Still exists due to set2
812        assert!(set1.contains(&3));
813    }
814
815    #[tokio::test]
816    async fn test_rdf_crdt() {
817        let config = CrdtConfig {
818            node_id: "node1".to_string(),
819            crdt_type: CrdtType::RdfCrdt,
820            gc_config: GcConfig::default(),
821            delta_config: DeltaConfig::default(),
822        };
823
824        let mut crdt = RdfCrdt::new(config)
825            .await
826            .expect("async operation should succeed");
827
828        // Add triples
829        let triple1 = Triple::new(
830            NamedNode::new("http://example.org/s1").expect("valid IRI"),
831            NamedNode::new("http://example.org/p1").expect("valid IRI"),
832            Object::Literal(Literal::new("value1")),
833        );
834
835        let triple2 = Triple::new(
836            NamedNode::new("http://example.org/s1").expect("valid IRI"),
837            NamedNode::new("http://example.org/p2").expect("valid IRI"),
838            Object::Literal(Literal::new("value2")),
839        );
840
841        crdt.add_triple(triple1.clone())
842            .await
843            .expect("async operation should succeed");
844        crdt.add_triple(triple2.clone())
845            .await
846            .expect("async operation should succeed");
847
848        // Query by subject
849        let pattern = TriplePattern::new(
850            Some(crate::model::SubjectPattern::NamedNode(
851                NamedNode::new("http://example.org/s1").expect("valid IRI"),
852            )),
853            None,
854            None,
855        );
856
857        let results = crdt
858            .query(&pattern)
859            .await
860            .expect("async operation should succeed");
861        assert_eq!(results.len(), 2);
862
863        // Remove triple
864        crdt.remove_triple(&triple1)
865            .await
866            .expect("async operation should succeed");
867
868        let results = crdt
869            .query(&pattern)
870            .await
871            .expect("async operation should succeed");
872        assert_eq!(results.len(), 1);
873        assert_eq!(results[0], triple2);
874    }
875
876    #[tokio::test]
877    async fn test_rdf_crdt_merge() {
878        let config1 = CrdtConfig {
879            node_id: "node1".to_string(),
880            crdt_type: CrdtType::RdfCrdt,
881            gc_config: GcConfig::default(),
882            delta_config: DeltaConfig::default(),
883        };
884
885        let config2 = CrdtConfig {
886            node_id: "node2".to_string(),
887            crdt_type: CrdtType::RdfCrdt,
888            gc_config: GcConfig::default(),
889            delta_config: DeltaConfig::default(),
890        };
891
892        let mut crdt1 = RdfCrdt::new(config1)
893            .await
894            .expect("async operation should succeed");
895        let mut crdt2 = RdfCrdt::new(config2)
896            .await
897            .expect("async operation should succeed");
898
899        // Add different triples to each
900        let triple1 = Triple::new(
901            NamedNode::new("http://example.org/s1").expect("valid IRI"),
902            NamedNode::new("http://example.org/p1").expect("valid IRI"),
903            Object::Literal(Literal::new("value1")),
904        );
905
906        let triple2 = Triple::new(
907            NamedNode::new("http://example.org/s2").expect("valid IRI"),
908            NamedNode::new("http://example.org/p2").expect("valid IRI"),
909            Object::Literal(Literal::new("value2")),
910        );
911
912        crdt1
913            .add_triple(triple1.clone())
914            .await
915            .expect("async operation should succeed");
916        crdt2
917            .add_triple(triple2.clone())
918            .await
919            .expect("async operation should succeed");
920
921        // Merge
922        crdt1
923            .merge(&crdt2)
924            .await
925            .expect("async operation should succeed");
926
927        // Both triples should be in crdt1
928        let pattern = TriplePattern::new(None, None, None);
929        let results = crdt1
930            .query(&pattern)
931            .await
932            .expect("async operation should succeed");
933        assert_eq!(results.len(), 2);
934    }
935}