use super::{CRDT, Mergeable, ReplicaId};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::error::Error;
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ListError {
message: String,
}
impl ListError {
pub fn new(message: String) -> Self {
Self { message }
}
}
impl std::fmt::Display for ListError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ListError: {}", self.message)
}
}
impl Error for ListError {}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ElementId {
pub id: Uuid,
pub replica: ReplicaId,
}
impl ElementId {
pub fn new(replica: ReplicaId) -> Self {
Self {
id: Uuid::new_v4(),
replica,
}
}
pub fn from_parts(id: Uuid, replica: ReplicaId) -> Self {
Self { id, replica }
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ElementMetadata {
pub created_at: u64,
pub modified_at: u64,
pub deleted: bool,
pub last_modified_by: ReplicaId,
}
impl ElementMetadata {
pub fn new(replica: ReplicaId, timestamp: u64) -> Self {
Self {
created_at: timestamp,
modified_at: timestamp,
deleted: false,
last_modified_by: replica,
}
}
pub fn mark_modified(&mut self, replica: ReplicaId, timestamp: u64) {
self.modified_at = timestamp;
self.last_modified_by = replica;
}
pub fn mark_deleted(&mut self, replica: ReplicaId, timestamp: u64) {
self.deleted = true;
self.mark_modified(replica, timestamp);
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ListElement<T> {
pub id: ElementId,
pub value: T,
pub metadata: ElementMetadata,
}
impl<T> ListElement<T> {
pub fn new(value: T, replica: ReplicaId, timestamp: u64) -> Self {
Self {
id: ElementId::new(replica),
value,
metadata: ElementMetadata::new(replica, timestamp),
}
}
pub fn mark_modified(&mut self, replica: ReplicaId, timestamp: u64) {
self.metadata.mark_modified(replica, timestamp);
}
pub fn mark_deleted(&mut self, replica: ReplicaId, timestamp: u64) {
self.metadata.mark_deleted(replica, timestamp);
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ListStrategy {
AddWins,
RemoveWins,
LastWriteWins,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ListConfig {
pub strategy: ListStrategy,
pub preserve_deleted: bool,
pub max_elements: Option<usize>,
}
impl Default for ListConfig {
fn default() -> Self {
Self {
strategy: ListStrategy::AddWins,
preserve_deleted: true,
max_elements: None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AddWinsList<T> {
config: ListConfig,
elements: HashMap<ElementId, ListElement<T>>,
replica: ReplicaId,
}
impl<T: Clone + PartialEq + Eq + Send + Sync> AddWinsList<T> {
pub fn new(replica: ReplicaId) -> Self {
Self {
config: ListConfig::default(),
elements: HashMap::new(),
replica,
}
}
pub fn with_config(replica: ReplicaId, config: ListConfig) -> Self {
Self {
config,
elements: HashMap::new(),
replica,
}
}
pub fn add(&mut self, value: T, timestamp: u64) -> ElementId {
let element = ListElement::new(value, self.replica, timestamp);
let id = element.id.clone();
self.elements.insert(id.clone(), element);
id
}
pub fn update(&mut self, id: &ElementId, value: T, timestamp: u64) -> Result<(), ListError> {
if let Some(element) = self.elements.get_mut(id) {
element.value = value;
element.mark_modified(self.replica, timestamp);
Ok(())
} else {
Err(ListError::new("Element not found".to_string()))
}
}
pub fn remove(&mut self, id: &ElementId, timestamp: u64) -> Result<(), ListError> {
if let Some(element) = self.elements.get_mut(id) {
element.mark_deleted(self.replica, timestamp);
Ok(())
} else {
Err(ListError::new("Element not found".to_string()))
}
}
pub fn get(&self, id: &ElementId) -> Option<&ListElement<T>> {
self.elements.get(id)
}
pub fn visible_elements(&self) -> Vec<&ListElement<T>> {
self.elements
.values()
.filter(|e| !e.metadata.deleted)
.collect()
}
pub fn all_elements(&self) -> Vec<&ListElement<T>> {
self.elements.values().collect()
}
pub fn contains(&self, id: &ElementId) -> bool {
self.elements.contains_key(id)
}
pub fn len(&self) -> usize {
self.visible_elements().len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn clear(&mut self) {
self.elements.clear();
}
}
impl<T: Clone + PartialEq + Eq + Send + Sync> CRDT for AddWinsList<T> {
fn replica_id(&self) -> &ReplicaId {
&self.replica
}
}
impl<T: Clone + PartialEq + Eq + Send + Sync> Mergeable for AddWinsList<T> {
type Error = ListError;
fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
for (id, element) in &other.elements {
match self.elements.get(id) {
Some(existing) => {
if element.metadata.modified_at > existing.metadata.modified_at {
self.elements.insert(id.clone(), element.clone());
}
}
None => {
self.elements.insert(id.clone(), element.clone());
}
}
}
Ok(())
}
fn has_conflict(&self, other: &Self) -> bool {
for (id, element) in &other.elements {
if let Some(existing) = self.elements.get(id) {
if element.metadata.modified_at == existing.metadata.modified_at
&& element.metadata.last_modified_by != existing.metadata.last_modified_by
{
return true;
}
}
}
false
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RemoveWinsList<T> {
config: ListConfig,
elements: HashMap<ElementId, ListElement<T>>,
replica: ReplicaId,
}
impl<T: Clone + PartialEq + Eq + Send + Sync> RemoveWinsList<T> {
pub fn new(replica: ReplicaId) -> Self {
Self {
config: ListConfig {
strategy: ListStrategy::RemoveWins,
preserve_deleted: false,
max_elements: None,
},
elements: HashMap::new(),
replica,
}
}
pub fn with_config(replica: ReplicaId, config: ListConfig) -> Self {
Self {
config,
elements: HashMap::new(),
replica,
}
}
pub fn add(&mut self, value: T, timestamp: u64) -> ElementId {
let element = ListElement::new(value, self.replica, timestamp);
let id = element.id.clone();
self.elements.insert(id.clone(), element);
id
}
pub fn update(&mut self, id: &ElementId, value: T, timestamp: u64) -> Result<(), ListError> {
if let Some(element) = self.elements.get_mut(id) {
element.value = value;
element.mark_modified(self.replica, timestamp);
Ok(())
} else {
Err(ListError::new("Element not found".to_string()))
}
}
pub fn remove(&mut self, id: &ElementId) -> Result<(), ListError> {
if self.elements.remove(id).is_some() {
Ok(())
} else {
Err(ListError::new("Element not found".to_string()))
}
}
pub fn get(&self, id: &ElementId) -> Option<&ListElement<T>> {
self.elements.get(id)
}
pub fn elements(&self) -> Vec<&ListElement<T>> {
self.elements.values().collect()
}
pub fn contains(&self, id: &ElementId) -> bool {
self.elements.contains_key(id)
}
pub fn len(&self) -> usize {
self.elements.len()
}
pub fn is_empty(&self) -> bool {
self.elements.is_empty()
}
pub fn clear(&mut self) {
self.elements.clear();
}
}
impl<T: Clone + PartialEq + Eq + Send + Sync> CRDT for RemoveWinsList<T> {
fn replica_id(&self) -> &ReplicaId {
&self.replica
}
}
impl<T: Clone + PartialEq + Eq + Send + Sync> Mergeable for RemoveWinsList<T> {
type Error = ListError;
fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
for (id, element) in &other.elements {
match self.elements.get(id) {
Some(existing) => {
if element.metadata.modified_at > existing.metadata.modified_at {
self.elements.insert(id.clone(), element.clone());
}
}
None => {
self.elements.insert(id.clone(), element.clone());
}
}
}
Ok(())
}
fn has_conflict(&self, other: &Self) -> bool {
for (id, element) in &other.elements {
if let Some(existing) = self.elements.get(id) {
if element.metadata.modified_at == existing.metadata.modified_at
&& element.metadata.last_modified_by != existing.metadata.last_modified_by
{
return true;
}
}
}
false
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LwwList<T> {
config: ListConfig,
elements: HashMap<ElementId, ListElement<T>>,
replica: ReplicaId,
}
impl<T: Clone + PartialEq + Eq + Send + Sync> LwwList<T> {
pub fn new(replica: ReplicaId) -> Self {
Self {
config: ListConfig {
strategy: ListStrategy::LastWriteWins,
preserve_deleted: true,
max_elements: None,
},
elements: HashMap::new(),
replica,
}
}
pub fn with_config(replica: ReplicaId, config: ListConfig) -> Self {
Self {
config,
elements: HashMap::new(),
replica,
}
}
pub fn add(&mut self, value: T, timestamp: u64) -> ElementId {
let element = ListElement::new(value, self.replica, timestamp);
let id = element.id.clone();
self.elements.insert(id.clone(), element);
id
}
pub fn update(&mut self, id: &ElementId, value: T, timestamp: u64) -> Result<(), ListError> {
if let Some(element) = self.elements.get_mut(id) {
element.value = value;
element.mark_modified(self.replica, timestamp);
Ok(())
} else {
Err(ListError::new("Element not found".to_string()))
}
}
pub fn remove(&mut self, id: &ElementId, timestamp: u64) -> Result<(), ListError> {
if let Some(element) = self.elements.get_mut(id) {
element.mark_deleted(self.replica, timestamp);
Ok(())
} else {
Err(ListError::new("Element not found".to_string()))
}
}
pub fn get(&self, id: &ElementId) -> Option<&ListElement<T>> {
self.elements.get(id)
}
pub fn visible_elements(&self) -> Vec<&ListElement<T>> {
self.elements
.values()
.filter(|e| !e.metadata.deleted)
.collect()
}
pub fn all_elements(&self) -> Vec<&ListElement<T>> {
self.elements.values().collect()
}
pub fn contains(&self, id: &ElementId) -> bool {
self.elements.contains_key(id)
}
pub fn len(&self) -> usize {
self.visible_elements().len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn clear(&mut self) {
self.elements.clear();
}
}
impl<T: Clone + PartialEq + Eq + Send + Sync> CRDT for LwwList<T> {
fn replica_id(&self) -> &ReplicaId {
&self.replica
}
}
impl<T: Clone + PartialEq + Eq + Send + Sync> Mergeable for LwwList<T> {
type Error = ListError;
fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
for (id, element) in &other.elements {
match self.elements.get(id) {
Some(existing) => {
if element.metadata.modified_at > existing.metadata.modified_at {
self.elements.insert(id.clone(), element.clone());
}
}
None => {
self.elements.insert(id.clone(), element.clone());
}
}
}
Ok(())
}
fn has_conflict(&self, other: &Self) -> bool {
for (id, element) in &other.elements {
if let Some(existing) = self.elements.get(id) {
if element.metadata.modified_at == existing.metadata.modified_at
&& element.metadata.last_modified_by != existing.metadata.last_modified_by
{
return true;
}
}
}
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::ReplicaId;
use uuid::Uuid;
fn create_replica(id: u64) -> ReplicaId {
ReplicaId::from(Uuid::from_u64_pair(0, id))
}
#[test]
fn test_element_id_creation() {
let replica = create_replica(1);
let element_id = ElementId::new(replica);
assert_eq!(element_id.replica, replica);
assert_ne!(element_id.id, Uuid::nil());
}
#[test]
fn test_list_element_creation() {
let replica = create_replica(1);
let timestamp = 1234567890;
let element = ListElement::new("test_value", replica, timestamp);
assert_eq!(element.value, "test_value");
assert_eq!(element.metadata.created_at, timestamp);
assert_eq!(element.metadata.modified_at, timestamp);
assert_eq!(element.metadata.deleted, false);
assert_eq!(element.metadata.last_modified_by, replica);
}
#[test]
fn test_add_wins_list_basic_operations() {
let replica = create_replica(1);
let mut list = AddWinsList::new(replica);
let id1 = list.add("first", 1000);
let id2 = list.add("second", 2000);
assert_eq!(list.len(), 2);
assert!(list.contains(&id1));
assert!(list.contains(&id2));
list.update(&id1, "updated_first", 3000).unwrap();
assert_eq!(list.get(&id1).unwrap().value, "updated_first");
list.remove(&id2, 4000).unwrap();
assert_eq!(list.len(), 1); assert!(list.get(&id2).unwrap().metadata.deleted);
}
#[test]
fn test_remove_wins_list_basic_operations() {
let replica = create_replica(1);
let mut list = RemoveWinsList::new(replica);
let id1 = list.add("first", 1000);
let id2 = list.add("second", 2000);
assert_eq!(list.len(), 2);
list.remove(&id2).unwrap();
assert_eq!(list.len(), 1);
assert!(!list.contains(&id2));
}
#[test]
fn test_lww_list_basic_operations() {
let replica = create_replica(1);
let mut list = LwwList::new(replica);
let id1 = list.add("first", 1000);
let id2 = list.add("second", 2000);
assert_eq!(list.len(), 2);
list.update(&id1, "updated_first", 3000).unwrap();
assert_eq!(list.get(&id1).unwrap().value, "updated_first");
list.remove(&id2, 4000).unwrap();
assert_eq!(list.len(), 1);
assert!(list.get(&id2).unwrap().metadata.deleted);
}
#[test]
fn test_list_merge() {
let replica1 = create_replica(1);
let replica2 = create_replica(2);
let mut list1 = AddWinsList::new(replica1);
let mut list2 = AddWinsList::new(replica2);
let id1 = list1.add("from_replica1", 1000);
let id2 = list2.add("from_replica2", 2000);
list1.merge(&list2).unwrap();
assert_eq!(list1.len(), 2);
assert!(list1.contains(&id1));
assert!(list1.contains(&id2));
}
#[test]
fn test_list_merge_conflict_resolution() {
let replica1 = create_replica(1);
let replica2 = create_replica(2);
let mut list1 = AddWinsList::new(replica1);
let mut list2 = AddWinsList::new(replica2);
let id = list1.add("value1", 1000);
list2.elements.insert(id.clone(), ListElement::new("value2", replica2, 2000));
list1.merge(&list2).unwrap();
assert_eq!(list1.get(&id).unwrap().value, "value2");
}
#[test]
fn test_list_configuration() {
let replica = create_replica(1);
let config = ListConfig {
strategy: ListStrategy::RemoveWins,
preserve_deleted: false,
max_elements: Some(100),
};
let list: AddWinsList<String> = AddWinsList::with_config(replica, config);
assert_eq!(list.config.strategy, ListStrategy::RemoveWins);
assert_eq!(list.config.max_elements, Some(100));
}
}