1#![allow(dead_code)]
7
8use crate::model::{Triple, TriplePattern};
9use crate::OxirsError;
10use scirs2_core::random::{Random, RngExt};
11use serde::{Deserialize, Serialize};
12use std::collections::{BTreeMap, BTreeSet, HashMap};
13use std::sync::Arc;
14use tokio::sync::RwLock;
15
16#[derive(Debug, Clone)]
18pub struct CrdtConfig {
19 pub node_id: String,
21 pub crdt_type: CrdtType,
23 pub gc_config: GcConfig,
25 pub delta_config: DeltaConfig,
27}
28
29#[derive(Debug, Clone)]
31pub enum CrdtType {
32 GSet,
34 TwoPhaseSet,
36 AddRemovePartialOrder,
38 OrSet,
40 LwwSet,
42 MvRegister,
44 RdfCrdt,
46}
47
48#[derive(Debug, Clone)]
50pub struct GcConfig {
51 pub auto_gc: bool,
53 pub interval_secs: u64,
55 pub tombstone_ttl_secs: u64,
57 pub batch_size: usize,
59}
60
61impl Default for GcConfig {
62 fn default() -> Self {
63 GcConfig {
64 auto_gc: true,
65 interval_secs: 3600, tombstone_ttl_secs: 86400 * 7, batch_size: 1000,
68 }
69 }
70}
71
72#[derive(Debug, Clone)]
74pub struct DeltaConfig {
75 pub enabled: bool,
77 pub max_delta_size: usize,
79 pub buffer_size: usize,
81 pub compression: bool,
83}
84
85impl Default for DeltaConfig {
86 fn default() -> Self {
87 DeltaConfig {
88 enabled: true,
89 max_delta_size: 10000,
90 buffer_size: 100000,
91 compression: true,
92 }
93 }
94}
95
96pub trait Crdt: Send + Sync {
98 type Delta: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>;
100
101 fn merge(&mut self, other: &Self);
103
104 fn delta(&self) -> Option<Self::Delta>;
106
107 fn apply_delta(&mut self, delta: Self::Delta);
109
110 fn reset_delta(&mut self);
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
116pub struct ElementId {
117 pub timestamp: u64,
119 pub node_id: String,
121 pub random: u64,
123}
124
125impl ElementId {
126 pub fn new(timestamp: u64, node_id: String) -> Self {
128 ElementId {
129 timestamp,
130 node_id,
131 random: {
132 let mut rng = Random::default();
133 rng.random::<u64>()
134 },
135 }
136 }
137}
138
139#[derive(Debug, Clone)]
141pub struct GrowSet<T: Clone + Ord + Send + Sync> {
142 elements: BTreeSet<T>,
144 delta_elements: Option<BTreeSet<T>>,
146}
147
148impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> Default for GrowSet<T> {
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> GrowSet<T> {
155 pub fn new() -> Self {
157 GrowSet {
158 elements: BTreeSet::new(),
159 delta_elements: Some(BTreeSet::new()),
160 }
161 }
162
163 pub fn add(&mut self, element: T) {
165 if self.elements.insert(element.clone()) {
166 if let Some(ref mut delta) = self.delta_elements {
167 delta.insert(element);
168 }
169 }
170 }
171
172 pub fn contains(&self, element: &T) -> bool {
174 self.elements.contains(element)
175 }
176
177 pub fn elements(&self) -> &BTreeSet<T> {
179 &self.elements
180 }
181}
182
183impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> Crdt for GrowSet<T> {
184 type Delta = BTreeSet<T>;
185
186 fn merge(&mut self, other: &Self) {
187 for element in &other.elements {
188 self.add(element.clone());
189 }
190 }
191
192 fn delta(&self) -> Option<Self::Delta> {
193 self.delta_elements.clone()
194 }
195
196 fn apply_delta(&mut self, delta: Self::Delta) {
197 for element in delta {
198 self.elements.insert(element);
199 }
200 }
201
202 fn reset_delta(&mut self) {
203 self.delta_elements = Some(BTreeSet::new());
204 }
205}
206
207#[derive(Debug, Clone)]
209pub struct TwoPhaseSet<T: Clone + Ord + Send + Sync> {
210 added: BTreeSet<T>,
212 removed: BTreeSet<T>,
214 delta_added: Option<BTreeSet<T>>,
216 delta_removed: Option<BTreeSet<T>>,
217}
218
219impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> Default
220 for TwoPhaseSet<T>
221{
222 fn default() -> Self {
223 Self::new()
224 }
225}
226
227impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> TwoPhaseSet<T> {
228 pub fn new() -> Self {
230 TwoPhaseSet {
231 added: BTreeSet::new(),
232 removed: BTreeSet::new(),
233 delta_added: Some(BTreeSet::new()),
234 delta_removed: Some(BTreeSet::new()),
235 }
236 }
237
238 pub fn add(&mut self, element: T) {
240 if !self.removed.contains(&element) && self.added.insert(element.clone()) {
241 if let Some(ref mut delta) = self.delta_added {
242 delta.insert(element);
243 }
244 }
245 }
246
247 pub fn remove(&mut self, element: T) {
249 if self.added.contains(&element) && self.removed.insert(element.clone()) {
250 if let Some(ref mut delta) = self.delta_removed {
251 delta.insert(element);
252 }
253 }
254 }
255
256 pub fn contains(&self, element: &T) -> bool {
258 self.added.contains(element) && !self.removed.contains(element)
259 }
260
261 pub fn elements(&self) -> BTreeSet<T> {
263 self.added.difference(&self.removed).cloned().collect()
264 }
265}
266
267#[derive(Debug, Clone)]
269pub struct OrSet<T: Clone + Ord + Send + Sync> {
270 elements: BTreeMap<T, BTreeSet<ElementId>>,
272 tombstones: BTreeMap<T, BTreeSet<ElementId>>,
274 node_id: String,
276 clock: u64,
278 delta: Option<OrSetDelta<T>>,
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct OrSetDelta<T: Clone + Ord> {
284 added: BTreeMap<T, BTreeSet<ElementId>>,
286 removed: BTreeMap<T, BTreeSet<ElementId>>,
288}
289
290impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> OrSet<T> {
291 pub fn new(node_id: String) -> Self {
293 OrSet {
294 elements: BTreeMap::new(),
295 tombstones: BTreeMap::new(),
296 node_id,
297 clock: 0,
298 delta: Some(OrSetDelta {
299 added: BTreeMap::new(),
300 removed: BTreeMap::new(),
301 }),
302 }
303 }
304
305 pub fn add(&mut self, element: T) {
307 self.clock += 1;
308 let tag = ElementId::new(self.clock, self.node_id.clone());
309
310 self.elements
311 .entry(element.clone())
312 .or_default()
313 .insert(tag.clone());
314
315 if let Some(ref mut delta) = self.delta {
316 delta
317 .added
318 .entry(element)
319 .or_insert_with(BTreeSet::new)
320 .insert(tag);
321 }
322 }
323
324 pub fn remove(&mut self, element: &T) {
326 if let Some(tags) = self.elements.get(element).cloned() {
327 self.tombstones.insert(element.clone(), tags.clone());
328 self.elements.remove(element);
329
330 if let Some(ref mut delta) = self.delta {
331 delta.removed.insert(element.clone(), tags);
332 }
333 }
334 }
335
336 pub fn contains(&self, element: &T) -> bool {
338 if let Some(tags) = self.elements.get(element) {
339 if let Some(tombstone_tags) = self.tombstones.get(element) {
340 !tags.is_subset(tombstone_tags)
342 } else {
343 true
344 }
345 } else {
346 false
347 }
348 }
349
350 pub fn elements(&self) -> BTreeSet<T> {
352 self.elements
353 .keys()
354 .filter(|e| self.contains(e))
355 .cloned()
356 .collect()
357 }
358}
359
360impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> Crdt for OrSet<T> {
361 type Delta = OrSetDelta<T>;
362
363 fn merge(&mut self, other: &Self) {
364 for (element, tags) in &other.elements {
366 self.elements
367 .entry(element.clone())
368 .or_default()
369 .extend(tags.iter().cloned());
370 }
371
372 for (element, tags) in &other.tombstones {
374 self.tombstones
375 .entry(element.clone())
376 .or_default()
377 .extend(tags.iter().cloned());
378 }
379
380 let to_remove: Vec<_> = self
382 .elements
383 .iter()
384 .filter(|(e, tags)| {
385 if let Some(tombstone_tags) = self.tombstones.get(e) {
386 tags.is_subset(tombstone_tags)
387 } else {
388 false
389 }
390 })
391 .map(|(e, _)| e.clone())
392 .collect();
393
394 for element in to_remove {
395 self.elements.remove(&element);
396 }
397
398 self.clock = self.clock.max(other.clock);
400 }
401
402 fn delta(&self) -> Option<Self::Delta> {
403 self.delta.clone()
404 }
405
406 fn apply_delta(&mut self, delta: Self::Delta) {
407 for (element, tags) in delta.added {
409 self.elements.entry(element).or_default().extend(tags);
410 }
411
412 for (element, tags) in delta.removed {
414 self.tombstones
415 .entry(element.clone())
416 .or_default()
417 .extend(tags);
418
419 if let Some(elem_tags) = self.elements.get(&element) {
421 if let Some(tombstone_tags) = self.tombstones.get(&element) {
422 if elem_tags.is_subset(tombstone_tags) {
423 self.elements.remove(&element);
424 }
425 }
426 }
427 }
428 }
429
430 fn reset_delta(&mut self) {
431 self.delta = Some(OrSetDelta {
432 added: BTreeMap::new(),
433 removed: BTreeMap::new(),
434 });
435 }
436}
437
438pub struct RdfCrdt {
440 config: CrdtConfig,
442 triples: OrSet<Triple>,
444 predicate_index: HashMap<String, OrSet<Triple>>,
446 subject_index: HashMap<String, OrSet<Triple>>,
448 stats: Arc<RwLock<CrdtStats>>,
450}
451
452#[derive(Debug, Default)]
454struct CrdtStats {
455 total_ops: u64,
457 add_ops: u64,
459 remove_ops: u64,
461 merge_ops: u64,
463 triple_count: usize,
465 #[allow(dead_code)]
467 tombstone_count: usize,
468}
469
470impl RdfCrdt {
471 pub async fn new(config: CrdtConfig) -> Result<Self, OxirsError> {
473 let node_id = config.node_id.clone();
474
475 Ok(RdfCrdt {
476 config,
477 triples: OrSet::new(node_id),
478 predicate_index: HashMap::new(),
479 subject_index: HashMap::new(),
480 stats: Arc::new(RwLock::new(CrdtStats::default())),
481 })
482 }
483
484 pub async fn add_triple(&mut self, triple: Triple) -> Result<(), OxirsError> {
486 self.triples.add(triple.clone());
488
489 let predicate_str = match triple.predicate() {
491 crate::model::Predicate::NamedNode(nn) => nn.as_str(),
492 crate::model::Predicate::Variable(v) => v.as_str(),
493 };
494 self.predicate_index
495 .entry(predicate_str.to_string())
496 .or_insert_with(|| OrSet::new(self.config.node_id.clone()))
497 .add(triple.clone());
498
499 let subject_str = match triple.subject() {
501 crate::model::Subject::NamedNode(nn) => nn.as_str(),
502 crate::model::Subject::BlankNode(bn) => bn.as_str(),
503 crate::model::Subject::Variable(v) => v.as_str(),
504 crate::model::Subject::QuotedTriple(_qt) => "<<quoted-triple>>",
505 };
506 self.subject_index
507 .entry(subject_str.to_string())
508 .or_insert_with(|| OrSet::new(self.config.node_id.clone()))
509 .add(triple);
510
511 let mut stats = self.stats.write().await;
513 stats.total_ops += 1;
514 stats.add_ops += 1;
515 stats.triple_count = self.triples.elements().len();
516
517 Ok(())
518 }
519
520 pub async fn remove_triple(&mut self, triple: &Triple) -> Result<(), OxirsError> {
522 self.triples.remove(triple);
524
525 let predicate_str = match triple.predicate() {
527 crate::model::Predicate::NamedNode(nn) => nn.as_str(),
528 crate::model::Predicate::Variable(v) => v.as_str(),
529 };
530 if let Some(predicate_set) = self.predicate_index.get_mut(predicate_str) {
531 predicate_set.remove(triple);
532 }
533
534 let subject_str = match triple.subject() {
536 crate::model::Subject::NamedNode(nn) => nn.as_str(),
537 crate::model::Subject::BlankNode(bn) => bn.as_str(),
538 crate::model::Subject::Variable(v) => v.as_str(),
539 crate::model::Subject::QuotedTriple(_qt) => "<<quoted-triple>>",
540 };
541 if let Some(subject_set) = self.subject_index.get_mut(subject_str) {
542 subject_set.remove(triple);
543 }
544
545 let mut stats = self.stats.write().await;
547 stats.total_ops += 1;
548 stats.remove_ops += 1;
549 stats.triple_count = self.triples.elements().len();
550
551 Ok(())
552 }
553
554 pub async fn query(&self, pattern: &TriplePattern) -> Result<Vec<Triple>, OxirsError> {
556 let results = match (pattern.subject(), pattern.predicate(), pattern.object()) {
557 (Some(subject), Some(_predicate), _) => {
558 if let Some(subject_set) = self.subject_index.get(subject.as_str()) {
560 subject_set
561 .elements()
562 .into_iter()
563 .filter(|t| pattern.matches(t))
564 .collect()
565 } else {
566 Vec::new()
567 }
568 }
569 (Some(subject), None, _) => {
570 if let Some(subject_set) = self.subject_index.get(subject.as_str()) {
572 subject_set
573 .elements()
574 .into_iter()
575 .filter(|t| pattern.matches(t))
576 .collect()
577 } else {
578 Vec::new()
579 }
580 }
581 (None, Some(predicate), _) => {
582 if let Some(predicate_set) = self.predicate_index.get(predicate.as_str()) {
584 predicate_set
585 .elements()
586 .into_iter()
587 .filter(|t| pattern.matches(t))
588 .collect()
589 } else {
590 Vec::new()
591 }
592 }
593 _ => {
594 self.triples
596 .elements()
597 .into_iter()
598 .filter(|t| pattern.matches(t))
599 .collect()
600 }
601 };
602
603 Ok(results)
604 }
605
606 pub async fn merge(&mut self, other: &RdfCrdt) -> Result<(), OxirsError> {
608 self.triples.merge(&other.triples);
610
611 for (predicate, other_set) in &other.predicate_index {
613 self.predicate_index
614 .entry(predicate.clone())
615 .or_insert_with(|| OrSet::new(self.config.node_id.clone()))
616 .merge(other_set);
617 }
618
619 for (subject, other_set) in &other.subject_index {
621 self.subject_index
622 .entry(subject.clone())
623 .or_insert_with(|| OrSet::new(self.config.node_id.clone()))
624 .merge(other_set);
625 }
626
627 let mut stats = self.stats.write().await;
629 stats.merge_ops += 1;
630 stats.triple_count = self.triples.elements().len();
631
632 Ok(())
633 }
634
635 pub fn get_delta(&self) -> RdfCrdtDelta {
637 RdfCrdtDelta {
638 triples_delta: self.triples.delta(),
639 predicate_deltas: self
640 .predicate_index
641 .iter()
642 .filter_map(|(p, set)| set.delta().map(|d| (p.clone(), d)))
643 .collect(),
644 subject_deltas: self
645 .subject_index
646 .iter()
647 .filter_map(|(s, set)| set.delta().map(|d| (s.clone(), d)))
648 .collect(),
649 }
650 }
651
652 pub async fn apply_delta(&mut self, delta: RdfCrdtDelta) -> Result<(), OxirsError> {
654 if let Some(triples_delta) = delta.triples_delta {
656 self.triples.apply_delta(triples_delta);
657 }
658
659 for (predicate, pred_delta) in delta.predicate_deltas {
661 self.predicate_index
662 .entry(predicate)
663 .or_insert_with(|| OrSet::new(self.config.node_id.clone()))
664 .apply_delta(pred_delta);
665 }
666
667 for (subject, subj_delta) in delta.subject_deltas {
669 self.subject_index
670 .entry(subject)
671 .or_insert_with(|| OrSet::new(self.config.node_id.clone()))
672 .apply_delta(subj_delta);
673 }
674
675 let mut stats = self.stats.write().await;
677 stats.triple_count = self.triples.elements().len();
678
679 Ok(())
680 }
681
682 pub fn reset_delta(&mut self) {
684 self.triples.reset_delta();
685 for set in self.predicate_index.values_mut() {
686 set.reset_delta();
687 }
688 for set in self.subject_index.values_mut() {
689 set.reset_delta();
690 }
691 }
692
693 pub async fn garbage_collect(&mut self) -> Result<GcReport, OxirsError> {
695 let start_tombstones = self.triples.tombstones.len();
696
697 let cutoff = std::time::SystemTime::now()
699 .duration_since(std::time::UNIX_EPOCH)
700 .expect("system clock should be after Unix epoch")
701 .as_secs()
702 - self.config.gc_config.tombstone_ttl_secs;
703
704 self.triples
706 .tombstones
707 .retain(|_, tags| tags.iter().any(|tag| tag.timestamp > cutoff));
708
709 for set in self.predicate_index.values_mut() {
711 set.tombstones
712 .retain(|_, tags| tags.iter().any(|tag| tag.timestamp > cutoff));
713 }
714
715 for set in self.subject_index.values_mut() {
716 set.tombstones
717 .retain(|_, tags| tags.iter().any(|tag| tag.timestamp > cutoff));
718 }
719
720 let removed = start_tombstones - self.triples.tombstones.len();
721
722 Ok(GcReport {
723 tombstones_removed: removed,
724 space_reclaimed: removed * std::mem::size_of::<(Triple, BTreeSet<ElementId>)>(),
725 })
726 }
727
728 pub async fn stats(&self) -> CrdtStatsReport {
730 let stats = self.stats.read().await;
731 CrdtStatsReport {
732 total_ops: stats.total_ops,
733 add_ops: stats.add_ops,
734 remove_ops: stats.remove_ops,
735 merge_ops: stats.merge_ops,
736 triple_count: stats.triple_count,
737 tombstone_count: self.triples.tombstones.len(),
738 }
739 }
740}
741
742#[derive(Debug, Clone, Serialize, Deserialize)]
744pub struct RdfCrdtDelta {
745 pub triples_delta: Option<OrSetDelta<Triple>>,
747 pub predicate_deltas: HashMap<String, OrSetDelta<Triple>>,
749 pub subject_deltas: HashMap<String, OrSetDelta<Triple>>,
751}
752
753#[derive(Debug)]
755pub struct GcReport {
756 pub tombstones_removed: usize,
757 pub space_reclaimed: usize,
758}
759
760#[derive(Debug)]
762pub struct CrdtStatsReport {
763 pub total_ops: u64,
764 pub add_ops: u64,
765 pub remove_ops: u64,
766 pub merge_ops: u64,
767 pub triple_count: usize,
768 pub tombstone_count: usize,
769}
770
771#[cfg(test)]
772mod tests {
773 use super::*;
774 use crate::model::{Literal, NamedNode, Object};
775
776 #[tokio::test]
777 async fn test_grow_set() {
778 let mut set1 = GrowSet::new();
779 let mut set2 = GrowSet::new();
780
781 set1.add(1);
782 set1.add(2);
783 set2.add(2);
784 set2.add(3);
785
786 set1.merge(&set2);
787
788 assert!(set1.contains(&1));
789 assert!(set1.contains(&2));
790 assert!(set1.contains(&3));
791 assert_eq!(set1.elements().len(), 3);
792 }
793
794 #[tokio::test]
795 async fn test_or_set() {
796 let mut set1 = OrSet::new("node1".to_string());
797 let mut set2 = OrSet::new("node2".to_string());
798
799 set1.add(1);
800 set1.add(2);
801 set2.add(2);
802 set2.add(3);
803
804 set1.remove(&2);
806
807 set1.merge(&set2);
809
810 assert!(set1.contains(&1));
811 assert!(set1.contains(&2)); assert!(set1.contains(&3));
813 }
814
815 #[tokio::test]
816 async fn test_rdf_crdt() {
817 let config = CrdtConfig {
818 node_id: "node1".to_string(),
819 crdt_type: CrdtType::RdfCrdt,
820 gc_config: GcConfig::default(),
821 delta_config: DeltaConfig::default(),
822 };
823
824 let mut crdt = RdfCrdt::new(config)
825 .await
826 .expect("async operation should succeed");
827
828 let triple1 = Triple::new(
830 NamedNode::new("http://example.org/s1").expect("valid IRI"),
831 NamedNode::new("http://example.org/p1").expect("valid IRI"),
832 Object::Literal(Literal::new("value1")),
833 );
834
835 let triple2 = Triple::new(
836 NamedNode::new("http://example.org/s1").expect("valid IRI"),
837 NamedNode::new("http://example.org/p2").expect("valid IRI"),
838 Object::Literal(Literal::new("value2")),
839 );
840
841 crdt.add_triple(triple1.clone())
842 .await
843 .expect("async operation should succeed");
844 crdt.add_triple(triple2.clone())
845 .await
846 .expect("async operation should succeed");
847
848 let pattern = TriplePattern::new(
850 Some(crate::model::SubjectPattern::NamedNode(
851 NamedNode::new("http://example.org/s1").expect("valid IRI"),
852 )),
853 None,
854 None,
855 );
856
857 let results = crdt
858 .query(&pattern)
859 .await
860 .expect("async operation should succeed");
861 assert_eq!(results.len(), 2);
862
863 crdt.remove_triple(&triple1)
865 .await
866 .expect("async operation should succeed");
867
868 let results = crdt
869 .query(&pattern)
870 .await
871 .expect("async operation should succeed");
872 assert_eq!(results.len(), 1);
873 assert_eq!(results[0], triple2);
874 }
875
876 #[tokio::test]
877 async fn test_rdf_crdt_merge() {
878 let config1 = CrdtConfig {
879 node_id: "node1".to_string(),
880 crdt_type: CrdtType::RdfCrdt,
881 gc_config: GcConfig::default(),
882 delta_config: DeltaConfig::default(),
883 };
884
885 let config2 = CrdtConfig {
886 node_id: "node2".to_string(),
887 crdt_type: CrdtType::RdfCrdt,
888 gc_config: GcConfig::default(),
889 delta_config: DeltaConfig::default(),
890 };
891
892 let mut crdt1 = RdfCrdt::new(config1)
893 .await
894 .expect("async operation should succeed");
895 let mut crdt2 = RdfCrdt::new(config2)
896 .await
897 .expect("async operation should succeed");
898
899 let triple1 = Triple::new(
901 NamedNode::new("http://example.org/s1").expect("valid IRI"),
902 NamedNode::new("http://example.org/p1").expect("valid IRI"),
903 Object::Literal(Literal::new("value1")),
904 );
905
906 let triple2 = Triple::new(
907 NamedNode::new("http://example.org/s2").expect("valid IRI"),
908 NamedNode::new("http://example.org/p2").expect("valid IRI"),
909 Object::Literal(Literal::new("value2")),
910 );
911
912 crdt1
913 .add_triple(triple1.clone())
914 .await
915 .expect("async operation should succeed");
916 crdt2
917 .add_triple(triple2.clone())
918 .await
919 .expect("async operation should succeed");
920
921 crdt1
923 .merge(&crdt2)
924 .await
925 .expect("async operation should succeed");
926
927 let pattern = TriplePattern::new(None, None, None);
929 let results = crdt1
930 .query(&pattern)
931 .await
932 .expect("async operation should succeed");
933 assert_eq!(results.len(), 2);
934 }
935}