use super::common::{PositionId, AdvancedCrdtError};
use super::super::{CRDT, Mergeable, ReplicaId};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RgaElement<T> {
pub position: PositionId,
pub value: T,
pub visible: bool,
pub prev: Option<PositionId>,
}
impl<T> RgaElement<T> {
pub fn new(position: PositionId, value: T, prev: Option<PositionId>) -> Self {
Self {
position,
value,
visible: true,
prev,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Rga<T> {
replica_id: ReplicaId,
elements: HashMap<PositionId, RgaElement<T>>,
timestamp_counter: u64,
disambiguation_counter: u64,
}
impl<T: Clone + PartialEq> Rga<T> {
pub fn new(replica_id: ReplicaId) -> Self {
Self {
replica_id,
elements: HashMap::new(),
timestamp_counter: 0,
disambiguation_counter: 0,
}
}
pub fn insert_after(&mut self, value: T, after: Option<PositionId>) -> Result<PositionId, AdvancedCrdtError> {
self.timestamp_counter += 1;
self.disambiguation_counter += 1;
let position = PositionId::new(
self.replica_id.clone(),
self.timestamp_counter,
self.disambiguation_counter,
);
let element = RgaElement::new(position.clone(), value, after);
self.elements.insert(position.clone(), element);
Ok(position)
}
pub fn delete(&mut self, position: &PositionId) -> Result<(), AdvancedCrdtError> {
if let Some(element) = self.elements.get_mut(position) {
element.visible = false;
Ok(())
} else {
Err(AdvancedCrdtError::ElementNotFound(format!("Position {:?}", position)))
}
}
pub fn to_vec(&self) -> Vec<T> {
let mut result = Vec::new();
let mut elements: Vec<_> = self.elements.values()
.filter(|e| e.visible)
.collect();
elements.sort_by(|a, b| a.position.cmp(&b.position));
for element in elements {
result.push(element.value.clone());
}
result
}
fn find_first_element(&self) -> Option<PositionId> {
self.elements.values()
.find(|e| e.prev.is_none())
.map(|e| e.position.clone())
}
fn find_next_element(&self, position: &PositionId) -> Option<PositionId> {
self.elements.values()
.find(|e| e.prev.as_ref() == Some(position))
.map(|e| e.position.clone())
}
pub fn len(&self) -> usize {
self.elements.len()
}
pub fn is_empty(&self) -> bool {
self.elements.is_empty()
}
}
impl<T: Clone + PartialEq> CRDT for Rga<T> {
fn replica_id(&self) -> &ReplicaId {
&self.replica_id
}
}
impl<T: Clone + PartialEq + Send + Sync> Mergeable for Rga<T> {
type Error = AdvancedCrdtError;
fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
for (position, other_element) in &other.elements {
if let Some(self_element) = self.elements.get_mut(position) {
if other_element.position.timestamp > self_element.position.timestamp {
*self_element = other_element.clone();
}
} else {
self.elements.insert(position.clone(), other_element.clone());
}
}
Ok(())
}
fn has_conflict(&self, other: &Self) -> bool {
for (position, self_element) in &self.elements {
if let Some(other_element) = other.elements.get(position) {
if self_element.value != other_element.value {
return true;
}
}
}
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::super::ReplicaId;
use uuid::Uuid;
fn create_replica(id: u64) -> ReplicaId {
ReplicaId::from(Uuid::from_u64_pair(0, id))
}
#[test]
fn test_rga_creation() {
let replica_id = create_replica(1);
let rga = Rga::<String>::new(replica_id.clone());
assert_eq!(rga.replica_id(), &replica_id);
assert!(rga.is_empty());
assert_eq!(rga.len(), 0);
}
#[test]
fn test_rga_insert_and_delete() {
let replica_id = create_replica(1);
let mut rga = Rga::<String>::new(replica_id);
let pos1 = rga.insert_after("hello".to_string(), None).unwrap();
let pos2 = rga.insert_after("world".to_string(), Some(pos1.clone())).unwrap();
let pos3 = rga.insert_after("!".to_string(), Some(pos2.clone())).unwrap();
assert_eq!(rga.len(), 3);
assert_eq!(rga.to_vec(), vec!["hello", "world", "!"]);
rga.delete(&pos2).unwrap();
assert_eq!(rga.to_vec(), vec!["hello", "!"]);
rga.delete(&pos1).unwrap();
assert_eq!(rga.to_vec(), vec!["!"]);
rga.delete(&pos3).unwrap();
assert_eq!(rga.to_vec(), Vec::<String>::new());
}
#[test]
fn test_rga_merge() {
let replica_id1 = create_replica(1);
let replica_id2 = create_replica(2);
let mut rga1 = Rga::<String>::new(replica_id1);
let mut rga2 = Rga::<String>::new(replica_id2);
let _pos1 = rga1.insert_after("hello".to_string(), None).unwrap();
let _pos2 = rga2.insert_after("world".to_string(), None).unwrap();
rga1.merge(&rga2).unwrap();
let elements = rga1.to_vec();
assert!(elements.contains(&"hello".to_string()));
assert!(elements.contains(&"world".to_string()));
}
}