use crate::crdt::{Result, TaskId, TaskItem, TaskList};
use saorsa_gossip_crdt_sync::DeltaCrdt;
use saorsa_gossip_types::PeerId;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
pub type UniqueTag = (PeerId, u64);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskListDelta {
pub added_tasks: HashMap<TaskId, (TaskItem, UniqueTag)>,
pub removed_tasks: HashMap<TaskId, HashSet<UniqueTag>>,
pub task_updates: HashMap<TaskId, TaskItem>,
pub ordering_update: Option<Vec<TaskId>>,
pub name_update: Option<String>,
pub version: u64,
}
impl TaskListDelta {
#[must_use]
pub fn new(version: u64) -> Self {
Self {
added_tasks: HashMap::new(),
removed_tasks: HashMap::new(),
task_updates: HashMap::new(),
ordering_update: None,
name_update: None,
version,
}
}
#[must_use]
pub fn for_add(task_id: TaskId, task: TaskItem, tag: UniqueTag, version: u64) -> Self {
let mut delta = Self::new(version);
delta.added_tasks.insert(task_id, (task, tag));
delta
}
#[must_use]
pub fn for_state_change(task_id: TaskId, full_task: TaskItem, version: u64) -> Self {
let mut delta = Self::new(version);
delta.task_updates.insert(task_id, full_task);
delta
}
#[must_use]
pub fn for_reorder(new_order: Vec<TaskId>, version: u64) -> Self {
let mut delta = Self::new(version);
delta.ordering_update = Some(new_order);
delta
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.added_tasks.is_empty()
&& self.removed_tasks.is_empty()
&& self.task_updates.is_empty()
&& self.ordering_update.is_none()
&& self.name_update.is_none()
}
}
impl TaskList {
#[must_use]
pub fn version(&self) -> u64 {
self.current_version()
}
#[must_use]
pub fn delta(&self, since_version: u64) -> Option<TaskListDelta> {
let current_version = self.version();
if since_version >= current_version {
return None;
}
let mut delta = TaskListDelta::new(current_version);
for task in self.tasks_ordered() {
let task_id = *task.id();
let tag = (PeerId::new([0u8; 32]), 0); delta.added_tasks.insert(task_id, (task.clone(), tag));
}
delta.ordering_update = Some(self.tasks_ordered().iter().map(|t| *t.id()).collect());
delta.name_update = Some(self.name().to_string());
Some(delta)
}
pub fn merge_delta(&mut self, delta: &TaskListDelta, peer_id: PeerId) -> Result<()> {
for (task_id, (task, tag)) in &delta.added_tasks {
if self.get_task(task_id).is_none() {
self.add_task(task.clone(), tag.0, tag.1)?;
} else {
if let Some(existing_task) = self.get_task_mut(task_id) {
existing_task.merge(task)?;
}
}
}
for task_id in delta.removed_tasks.keys() {
let _ = self.remove_task(task_id);
}
for (task_id, updated_task) in &delta.task_updates {
if let Some(existing_task) = self.get_task_mut(task_id) {
existing_task.merge(updated_task)?;
} else {
self.add_task(updated_task.clone(), peer_id, 0)?;
}
}
if let Some(ref new_order) = delta.ordering_update {
let valid_order: Vec<TaskId> = new_order
.iter()
.filter(|id| self.get_task(id).is_some())
.copied()
.collect();
if !valid_order.is_empty() {
let _ = self.reorder(valid_order, peer_id);
}
}
if let Some(ref new_name) = delta.name_update {
self.update_name(new_name.clone(), peer_id);
}
Ok(())
}
}
impl DeltaCrdt for TaskList {
type Delta = TaskListDelta;
fn merge(&mut self, delta: &Self::Delta) -> anyhow::Result<()> {
let peer_id = PeerId::new([0u8; 32]);
self.merge_delta(delta, peer_id)
.map_err(|e| anyhow::anyhow!("Failed to merge delta: {}", e))
}
fn delta(&self, since_version: u64) -> Option<Self::Delta> {
self.delta(since_version)
}
fn version(&self) -> u64 {
self.version()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crdt::{TaskListId, TaskMetadata};
use crate::identity::AgentId;
fn agent(n: u8) -> AgentId {
AgentId([n; 32])
}
fn peer(n: u8) -> PeerId {
PeerId::new([n; 32])
}
fn list_id(n: u8) -> TaskListId {
TaskListId::new([n; 32])
}
fn make_task(id_byte: u8, peer: PeerId) -> TaskItem {
let agent = agent(1);
let task_id = TaskId::from_bytes([id_byte; 32]);
let metadata = TaskMetadata::new(
format!("Task {}", id_byte),
format!("Description {}", id_byte),
128,
agent,
1000,
);
TaskItem::new(task_id, metadata, peer)
}
#[test]
fn test_empty_delta() {
let delta = TaskListDelta::new(1);
assert!(delta.is_empty());
assert_eq!(delta.version, 1);
}
#[test]
fn test_delta_with_added_task() {
let mut delta = TaskListDelta::new(2);
let peer = peer(1);
let task = make_task(1, peer);
let task_id = *task.id();
let tag = (peer, 1);
delta.added_tasks.insert(task_id, (task, tag));
assert!(!delta.is_empty());
assert_eq!(delta.added_tasks.len(), 1);
}
#[test]
fn test_task_list_version() {
let peer = peer(1);
let id = list_id(1);
let mut list = TaskList::new(id, "Test".to_string(), peer);
let initial_version = list.version();
let task = make_task(1, peer);
list.add_task(task, peer, 1).ok().unwrap();
let new_version = list.version();
assert!(new_version > initial_version);
}
#[test]
fn test_delta_generation() {
let peer = peer(1);
let id = list_id(1);
let mut list = TaskList::new(id, "Test".to_string(), peer);
let task = make_task(1, peer);
list.add_task(task, peer, 1).ok().unwrap();
let delta = list.delta(0);
assert!(delta.is_some());
let delta = delta.unwrap();
assert!(!delta.is_empty());
assert!(!delta.added_tasks.is_empty());
}
#[test]
fn test_delta_no_changes() {
let peer = peer(1);
let id = list_id(1);
let list = TaskList::new(id, "Test".to_string(), peer);
let current_version = list.version();
let delta = list.delta(current_version);
assert!(delta.is_none());
}
#[test]
fn test_merge_delta_with_new_task() {
let peer1 = peer(1);
let peer2 = peer(2);
let id = list_id(1);
let mut list1 = TaskList::new(id, "List 1".to_string(), peer1);
let mut list2 = TaskList::new(id, "List 2".to_string(), peer2);
let task = make_task(1, peer2);
list2.add_task(task, peer2, 1).ok().unwrap();
let delta = list2.delta(0).unwrap();
let result = list1.merge_delta(&delta, peer1);
assert!(result.is_ok());
assert_eq!(list1.task_count(), 1);
}
#[test]
fn test_delta_crdt_trait_merge() {
let peer1 = peer(1);
let peer2 = peer(2);
let id = list_id(1);
let mut list1 = TaskList::new(id, "List".to_string(), peer1);
let mut list2 = TaskList::new(id, "List".to_string(), peer2);
let task = make_task(1, peer2);
list2.add_task(task, peer2, 1).ok().unwrap();
let delta = DeltaCrdt::delta(&list2, 0).unwrap();
let result = DeltaCrdt::merge(&mut list1, &delta);
assert!(result.is_ok());
assert!(
DeltaCrdt::version(&list1) > 0,
"version should be bumped after merge"
);
assert_eq!(list1.task_count(), 1);
}
#[test]
fn test_delta_serialization() {
let delta = TaskListDelta::new(5);
let serialized = bincode::serialize(&delta).ok().unwrap();
let deserialized: TaskListDelta = bincode::deserialize(&serialized).ok().unwrap();
assert_eq!(delta.version, deserialized.version);
assert_eq!(delta.is_empty(), deserialized.is_empty());
}
#[test]
fn test_merge_delta_with_ordering_update() {
let peer = peer(1);
let id = list_id(1);
let mut list = TaskList::new(id, "Test".to_string(), peer);
let task1 = make_task(1, peer);
let task2 = make_task(2, peer);
let id1 = *task1.id();
let id2 = *task2.id();
list.add_task(task1, peer, 1).ok().unwrap();
list.add_task(task2, peer, 2).ok().unwrap();
let mut delta = TaskListDelta::new(10);
delta.ordering_update = Some(vec![id2, id1]);
list.merge_delta(&delta, peer).ok().unwrap();
let tasks = list.tasks_ordered();
assert_eq!(tasks[0].id(), &id2);
assert_eq!(tasks[1].id(), &id1);
}
#[test]
fn test_merge_delta_with_name_update() {
let peer = peer(1);
let id = list_id(1);
let mut list = TaskList::new(id, "Original".to_string(), peer);
let mut delta = TaskListDelta::new(5);
delta.name_update = Some("Updated".to_string());
list.merge_delta(&delta, peer).ok().unwrap();
assert_eq!(list.name(), "Updated");
}
}