#![allow(dead_code)]
use crate::model::{Triple, TriplePattern};
use crate::OxirsError;
use scirs2_core::random::{Random, Rng};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct CrdtConfig {
pub node_id: String,
pub crdt_type: CrdtType,
pub gc_config: GcConfig,
pub delta_config: DeltaConfig,
}
#[derive(Debug, Clone)]
pub enum CrdtType {
GSet,
TwoPhaseSet,
AddRemovePartialOrder,
OrSet,
LwwSet,
MvRegister,
RdfCrdt,
}
#[derive(Debug, Clone)]
pub struct GcConfig {
pub auto_gc: bool,
pub interval_secs: u64,
pub tombstone_ttl_secs: u64,
pub batch_size: usize,
}
impl Default for GcConfig {
fn default() -> Self {
GcConfig {
auto_gc: true,
interval_secs: 3600, tombstone_ttl_secs: 86400 * 7, batch_size: 1000,
}
}
}
#[derive(Debug, Clone)]
pub struct DeltaConfig {
pub enabled: bool,
pub max_delta_size: usize,
pub buffer_size: usize,
pub compression: bool,
}
impl Default for DeltaConfig {
fn default() -> Self {
DeltaConfig {
enabled: true,
max_delta_size: 10000,
buffer_size: 100000,
compression: true,
}
}
}
pub trait Crdt: Send + Sync {
type Delta: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>;
fn merge(&mut self, other: &Self);
fn delta(&self) -> Option<Self::Delta>;
fn apply_delta(&mut self, delta: Self::Delta);
fn reset_delta(&mut self);
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct ElementId {
pub timestamp: u64,
pub node_id: String,
pub random: u64,
}
impl ElementId {
pub fn new(timestamp: u64, node_id: String) -> Self {
ElementId {
timestamp,
node_id,
random: {
let mut rng = Random::default();
rng.random::<u64>()
},
}
}
}
#[derive(Debug, Clone)]
pub struct GrowSet<T: Clone + Ord + Send + Sync> {
elements: BTreeSet<T>,
delta_elements: Option<BTreeSet<T>>,
}
impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> Default for GrowSet<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> GrowSet<T> {
pub fn new() -> Self {
GrowSet {
elements: BTreeSet::new(),
delta_elements: Some(BTreeSet::new()),
}
}
pub fn add(&mut self, element: T) {
if self.elements.insert(element.clone()) {
if let Some(ref mut delta) = self.delta_elements {
delta.insert(element);
}
}
}
pub fn contains(&self, element: &T) -> bool {
self.elements.contains(element)
}
pub fn elements(&self) -> &BTreeSet<T> {
&self.elements
}
}
impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> Crdt for GrowSet<T> {
type Delta = BTreeSet<T>;
fn merge(&mut self, other: &Self) {
for element in &other.elements {
self.add(element.clone());
}
}
fn delta(&self) -> Option<Self::Delta> {
self.delta_elements.clone()
}
fn apply_delta(&mut self, delta: Self::Delta) {
for element in delta {
self.elements.insert(element);
}
}
fn reset_delta(&mut self) {
self.delta_elements = Some(BTreeSet::new());
}
}
#[derive(Debug, Clone)]
pub struct TwoPhaseSet<T: Clone + Ord + Send + Sync> {
added: BTreeSet<T>,
removed: BTreeSet<T>,
delta_added: Option<BTreeSet<T>>,
delta_removed: Option<BTreeSet<T>>,
}
impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> Default
for TwoPhaseSet<T>
{
fn default() -> Self {
Self::new()
}
}
impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> TwoPhaseSet<T> {
pub fn new() -> Self {
TwoPhaseSet {
added: BTreeSet::new(),
removed: BTreeSet::new(),
delta_added: Some(BTreeSet::new()),
delta_removed: Some(BTreeSet::new()),
}
}
pub fn add(&mut self, element: T) {
if !self.removed.contains(&element) && self.added.insert(element.clone()) {
if let Some(ref mut delta) = self.delta_added {
delta.insert(element);
}
}
}
pub fn remove(&mut self, element: T) {
if self.added.contains(&element) && self.removed.insert(element.clone()) {
if let Some(ref mut delta) = self.delta_removed {
delta.insert(element);
}
}
}
pub fn contains(&self, element: &T) -> bool {
self.added.contains(element) && !self.removed.contains(element)
}
pub fn elements(&self) -> BTreeSet<T> {
self.added.difference(&self.removed).cloned().collect()
}
}
#[derive(Debug, Clone)]
pub struct OrSet<T: Clone + Ord + Send + Sync> {
elements: BTreeMap<T, BTreeSet<ElementId>>,
tombstones: BTreeMap<T, BTreeSet<ElementId>>,
node_id: String,
clock: u64,
delta: Option<OrSetDelta<T>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrSetDelta<T: Clone + Ord> {
added: BTreeMap<T, BTreeSet<ElementId>>,
removed: BTreeMap<T, BTreeSet<ElementId>>,
}
impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> OrSet<T> {
pub fn new(node_id: String) -> Self {
OrSet {
elements: BTreeMap::new(),
tombstones: BTreeMap::new(),
node_id,
clock: 0,
delta: Some(OrSetDelta {
added: BTreeMap::new(),
removed: BTreeMap::new(),
}),
}
}
pub fn add(&mut self, element: T) {
self.clock += 1;
let tag = ElementId::new(self.clock, self.node_id.clone());
self.elements
.entry(element.clone())
.or_default()
.insert(tag.clone());
if let Some(ref mut delta) = self.delta {
delta
.added
.entry(element)
.or_insert_with(BTreeSet::new)
.insert(tag);
}
}
pub fn remove(&mut self, element: &T) {
if let Some(tags) = self.elements.get(element).cloned() {
self.tombstones.insert(element.clone(), tags.clone());
self.elements.remove(element);
if let Some(ref mut delta) = self.delta {
delta.removed.insert(element.clone(), tags);
}
}
}
pub fn contains(&self, element: &T) -> bool {
if let Some(tags) = self.elements.get(element) {
if let Some(tombstone_tags) = self.tombstones.get(element) {
!tags.is_subset(tombstone_tags)
} else {
true
}
} else {
false
}
}
pub fn elements(&self) -> BTreeSet<T> {
self.elements
.keys()
.filter(|e| self.contains(e))
.cloned()
.collect()
}
}
impl<T: Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de>> Crdt for OrSet<T> {
type Delta = OrSetDelta<T>;
fn merge(&mut self, other: &Self) {
for (element, tags) in &other.elements {
self.elements
.entry(element.clone())
.or_default()
.extend(tags.iter().cloned());
}
for (element, tags) in &other.tombstones {
self.tombstones
.entry(element.clone())
.or_default()
.extend(tags.iter().cloned());
}
let to_remove: Vec<_> = self
.elements
.iter()
.filter(|(e, tags)| {
if let Some(tombstone_tags) = self.tombstones.get(e) {
tags.is_subset(tombstone_tags)
} else {
false
}
})
.map(|(e, _)| e.clone())
.collect();
for element in to_remove {
self.elements.remove(&element);
}
self.clock = self.clock.max(other.clock);
}
fn delta(&self) -> Option<Self::Delta> {
self.delta.clone()
}
fn apply_delta(&mut self, delta: Self::Delta) {
for (element, tags) in delta.added {
self.elements.entry(element).or_default().extend(tags);
}
for (element, tags) in delta.removed {
self.tombstones
.entry(element.clone())
.or_default()
.extend(tags);
if let Some(elem_tags) = self.elements.get(&element) {
if let Some(tombstone_tags) = self.tombstones.get(&element) {
if elem_tags.is_subset(tombstone_tags) {
self.elements.remove(&element);
}
}
}
}
}
fn reset_delta(&mut self) {
self.delta = Some(OrSetDelta {
added: BTreeMap::new(),
removed: BTreeMap::new(),
});
}
}
pub struct RdfCrdt {
config: CrdtConfig,
triples: OrSet<Triple>,
predicate_index: HashMap<String, OrSet<Triple>>,
subject_index: HashMap<String, OrSet<Triple>>,
stats: Arc<RwLock<CrdtStats>>,
}
#[derive(Debug, Default)]
struct CrdtStats {
total_ops: u64,
add_ops: u64,
remove_ops: u64,
merge_ops: u64,
triple_count: usize,
#[allow(dead_code)]
tombstone_count: usize,
}
impl RdfCrdt {
pub async fn new(config: CrdtConfig) -> Result<Self, OxirsError> {
let node_id = config.node_id.clone();
Ok(RdfCrdt {
config,
triples: OrSet::new(node_id),
predicate_index: HashMap::new(),
subject_index: HashMap::new(),
stats: Arc::new(RwLock::new(CrdtStats::default())),
})
}
pub async fn add_triple(&mut self, triple: Triple) -> Result<(), OxirsError> {
self.triples.add(triple.clone());
let predicate_str = match triple.predicate() {
crate::model::Predicate::NamedNode(nn) => nn.as_str(),
crate::model::Predicate::Variable(v) => v.as_str(),
};
self.predicate_index
.entry(predicate_str.to_string())
.or_insert_with(|| OrSet::new(self.config.node_id.clone()))
.add(triple.clone());
let subject_str = match triple.subject() {
crate::model::Subject::NamedNode(nn) => nn.as_str(),
crate::model::Subject::BlankNode(bn) => bn.as_str(),
crate::model::Subject::Variable(v) => v.as_str(),
crate::model::Subject::QuotedTriple(_qt) => "<<quoted-triple>>",
};
self.subject_index
.entry(subject_str.to_string())
.or_insert_with(|| OrSet::new(self.config.node_id.clone()))
.add(triple);
let mut stats = self.stats.write().await;
stats.total_ops += 1;
stats.add_ops += 1;
stats.triple_count = self.triples.elements().len();
Ok(())
}
pub async fn remove_triple(&mut self, triple: &Triple) -> Result<(), OxirsError> {
self.triples.remove(triple);
let predicate_str = match triple.predicate() {
crate::model::Predicate::NamedNode(nn) => nn.as_str(),
crate::model::Predicate::Variable(v) => v.as_str(),
};
if let Some(predicate_set) = self.predicate_index.get_mut(predicate_str) {
predicate_set.remove(triple);
}
let subject_str = match triple.subject() {
crate::model::Subject::NamedNode(nn) => nn.as_str(),
crate::model::Subject::BlankNode(bn) => bn.as_str(),
crate::model::Subject::Variable(v) => v.as_str(),
crate::model::Subject::QuotedTriple(_qt) => "<<quoted-triple>>",
};
if let Some(subject_set) = self.subject_index.get_mut(subject_str) {
subject_set.remove(triple);
}
let mut stats = self.stats.write().await;
stats.total_ops += 1;
stats.remove_ops += 1;
stats.triple_count = self.triples.elements().len();
Ok(())
}
pub async fn query(&self, pattern: &TriplePattern) -> Result<Vec<Triple>, OxirsError> {
let results = match (pattern.subject(), pattern.predicate(), pattern.object()) {
(Some(subject), Some(_predicate), _) => {
if let Some(subject_set) = self.subject_index.get(subject.as_str()) {
subject_set
.elements()
.into_iter()
.filter(|t| pattern.matches(t))
.collect()
} else {
Vec::new()
}
}
(Some(subject), None, _) => {
if let Some(subject_set) = self.subject_index.get(subject.as_str()) {
subject_set
.elements()
.into_iter()
.filter(|t| pattern.matches(t))
.collect()
} else {
Vec::new()
}
}
(None, Some(predicate), _) => {
if let Some(predicate_set) = self.predicate_index.get(predicate.as_str()) {
predicate_set
.elements()
.into_iter()
.filter(|t| pattern.matches(t))
.collect()
} else {
Vec::new()
}
}
_ => {
self.triples
.elements()
.into_iter()
.filter(|t| pattern.matches(t))
.collect()
}
};
Ok(results)
}
pub async fn merge(&mut self, other: &RdfCrdt) -> Result<(), OxirsError> {
self.triples.merge(&other.triples);
for (predicate, other_set) in &other.predicate_index {
self.predicate_index
.entry(predicate.clone())
.or_insert_with(|| OrSet::new(self.config.node_id.clone()))
.merge(other_set);
}
for (subject, other_set) in &other.subject_index {
self.subject_index
.entry(subject.clone())
.or_insert_with(|| OrSet::new(self.config.node_id.clone()))
.merge(other_set);
}
let mut stats = self.stats.write().await;
stats.merge_ops += 1;
stats.triple_count = self.triples.elements().len();
Ok(())
}
pub fn get_delta(&self) -> RdfCrdtDelta {
RdfCrdtDelta {
triples_delta: self.triples.delta(),
predicate_deltas: self
.predicate_index
.iter()
.filter_map(|(p, set)| set.delta().map(|d| (p.clone(), d)))
.collect(),
subject_deltas: self
.subject_index
.iter()
.filter_map(|(s, set)| set.delta().map(|d| (s.clone(), d)))
.collect(),
}
}
pub async fn apply_delta(&mut self, delta: RdfCrdtDelta) -> Result<(), OxirsError> {
if let Some(triples_delta) = delta.triples_delta {
self.triples.apply_delta(triples_delta);
}
for (predicate, pred_delta) in delta.predicate_deltas {
self.predicate_index
.entry(predicate)
.or_insert_with(|| OrSet::new(self.config.node_id.clone()))
.apply_delta(pred_delta);
}
for (subject, subj_delta) in delta.subject_deltas {
self.subject_index
.entry(subject)
.or_insert_with(|| OrSet::new(self.config.node_id.clone()))
.apply_delta(subj_delta);
}
let mut stats = self.stats.write().await;
stats.triple_count = self.triples.elements().len();
Ok(())
}
pub fn reset_delta(&mut self) {
self.triples.reset_delta();
for set in self.predicate_index.values_mut() {
set.reset_delta();
}
for set in self.subject_index.values_mut() {
set.reset_delta();
}
}
pub async fn garbage_collect(&mut self) -> Result<GcReport, OxirsError> {
let start_tombstones = self.triples.tombstones.len();
let cutoff = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system clock should be after Unix epoch")
.as_secs()
- self.config.gc_config.tombstone_ttl_secs;
self.triples
.tombstones
.retain(|_, tags| tags.iter().any(|tag| tag.timestamp > cutoff));
for set in self.predicate_index.values_mut() {
set.tombstones
.retain(|_, tags| tags.iter().any(|tag| tag.timestamp > cutoff));
}
for set in self.subject_index.values_mut() {
set.tombstones
.retain(|_, tags| tags.iter().any(|tag| tag.timestamp > cutoff));
}
let removed = start_tombstones - self.triples.tombstones.len();
Ok(GcReport {
tombstones_removed: removed,
space_reclaimed: removed * std::mem::size_of::<(Triple, BTreeSet<ElementId>)>(),
})
}
pub async fn stats(&self) -> CrdtStatsReport {
let stats = self.stats.read().await;
CrdtStatsReport {
total_ops: stats.total_ops,
add_ops: stats.add_ops,
remove_ops: stats.remove_ops,
merge_ops: stats.merge_ops,
triple_count: stats.triple_count,
tombstone_count: self.triples.tombstones.len(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RdfCrdtDelta {
pub triples_delta: Option<OrSetDelta<Triple>>,
pub predicate_deltas: HashMap<String, OrSetDelta<Triple>>,
pub subject_deltas: HashMap<String, OrSetDelta<Triple>>,
}
#[derive(Debug)]
pub struct GcReport {
pub tombstones_removed: usize,
pub space_reclaimed: usize,
}
#[derive(Debug)]
pub struct CrdtStatsReport {
pub total_ops: u64,
pub add_ops: u64,
pub remove_ops: u64,
pub merge_ops: u64,
pub triple_count: usize,
pub tombstone_count: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{Literal, NamedNode, Object};
#[tokio::test]
async fn test_grow_set() {
let mut set1 = GrowSet::new();
let mut set2 = GrowSet::new();
set1.add(1);
set1.add(2);
set2.add(2);
set2.add(3);
set1.merge(&set2);
assert!(set1.contains(&1));
assert!(set1.contains(&2));
assert!(set1.contains(&3));
assert_eq!(set1.elements().len(), 3);
}
#[tokio::test]
async fn test_or_set() {
let mut set1 = OrSet::new("node1".to_string());
let mut set2 = OrSet::new("node2".to_string());
set1.add(1);
set1.add(2);
set2.add(2);
set2.add(3);
set1.remove(&2);
set1.merge(&set2);
assert!(set1.contains(&1));
assert!(set1.contains(&2)); assert!(set1.contains(&3));
}
#[tokio::test]
async fn test_rdf_crdt() {
let config = CrdtConfig {
node_id: "node1".to_string(),
crdt_type: CrdtType::RdfCrdt,
gc_config: GcConfig::default(),
delta_config: DeltaConfig::default(),
};
let mut crdt = RdfCrdt::new(config)
.await
.expect("async operation should succeed");
let triple1 = Triple::new(
NamedNode::new("http://example.org/s1").expect("valid IRI"),
NamedNode::new("http://example.org/p1").expect("valid IRI"),
Object::Literal(Literal::new("value1")),
);
let triple2 = Triple::new(
NamedNode::new("http://example.org/s1").expect("valid IRI"),
NamedNode::new("http://example.org/p2").expect("valid IRI"),
Object::Literal(Literal::new("value2")),
);
crdt.add_triple(triple1.clone())
.await
.expect("async operation should succeed");
crdt.add_triple(triple2.clone())
.await
.expect("async operation should succeed");
let pattern = TriplePattern::new(
Some(crate::model::SubjectPattern::NamedNode(
NamedNode::new("http://example.org/s1").expect("valid IRI"),
)),
None,
None,
);
let results = crdt
.query(&pattern)
.await
.expect("async operation should succeed");
assert_eq!(results.len(), 2);
crdt.remove_triple(&triple1)
.await
.expect("async operation should succeed");
let results = crdt
.query(&pattern)
.await
.expect("async operation should succeed");
assert_eq!(results.len(), 1);
assert_eq!(results[0], triple2);
}
#[tokio::test]
async fn test_rdf_crdt_merge() {
let config1 = CrdtConfig {
node_id: "node1".to_string(),
crdt_type: CrdtType::RdfCrdt,
gc_config: GcConfig::default(),
delta_config: DeltaConfig::default(),
};
let config2 = CrdtConfig {
node_id: "node2".to_string(),
crdt_type: CrdtType::RdfCrdt,
gc_config: GcConfig::default(),
delta_config: DeltaConfig::default(),
};
let mut crdt1 = RdfCrdt::new(config1)
.await
.expect("async operation should succeed");
let mut crdt2 = RdfCrdt::new(config2)
.await
.expect("async operation should succeed");
let triple1 = Triple::new(
NamedNode::new("http://example.org/s1").expect("valid IRI"),
NamedNode::new("http://example.org/p1").expect("valid IRI"),
Object::Literal(Literal::new("value1")),
);
let triple2 = Triple::new(
NamedNode::new("http://example.org/s2").expect("valid IRI"),
NamedNode::new("http://example.org/p2").expect("valid IRI"),
Object::Literal(Literal::new("value2")),
);
crdt1
.add_triple(triple1.clone())
.await
.expect("async operation should succeed");
crdt2
.add_triple(triple2.clone())
.await
.expect("async operation should succeed");
crdt1
.merge(&crdt2)
.await
.expect("async operation should succeed");
let pattern = TriplePattern::new(None, None, None);
let results = crdt1
.query(&pattern)
.await
.expect("async operation should succeed");
assert_eq!(results.len(), 2);
}
}