use crate::crdt::{CrdtError, Result, TaskId, TaskItem};
use crate::identity::AgentId;
use saorsa_gossip_crdt_sync::{LwwRegister, OrSet};
use saorsa_gossip_types::PeerId;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct TaskListId([u8; 32]);
impl TaskListId {
#[must_use]
pub fn new(bytes: [u8; 32]) -> Self {
Self(bytes)
}
#[must_use]
pub fn as_bytes(&self) -> &[u8; 32] {
&self.0
}
#[must_use]
pub fn from_content(name: &str, creator: &AgentId, timestamp: u64) -> Self {
let mut hasher = blake3::Hasher::new();
hasher.update(name.as_bytes());
hasher.update(creator.as_bytes());
hasher.update(×tamp.to_le_bytes());
let hash = hasher.finalize();
Self(*hash.as_bytes())
}
}
impl std::fmt::Display for TaskListId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex::encode(self.0))
}
}
fn default_seq_counter() -> Arc<AtomicU64> {
Arc::new(AtomicU64::new(0))
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskList {
id: TaskListId,
tasks: OrSet<TaskId>,
task_data: HashMap<TaskId, TaskItem>,
ordering: LwwRegister<Vec<TaskId>>,
name: LwwRegister<String>,
#[serde(default)]
version: u64,
#[serde(skip, default = "default_seq_counter")]
seq_counter: Arc<AtomicU64>,
}
impl TaskList {
#[must_use]
pub fn new(id: TaskListId, name: String, _peer_id: PeerId) -> Self {
Self {
id,
tasks: OrSet::new(),
task_data: HashMap::new(),
ordering: LwwRegister::new(Vec::new()),
name: LwwRegister::new(name),
version: 0,
seq_counter: Arc::new(AtomicU64::new(0)),
}
}
#[must_use]
pub fn current_version(&self) -> u64 {
self.version
}
pub fn next_seq(&self) -> u64 {
self.seq_counter.fetch_add(1, Ordering::Relaxed) + 1
}
#[must_use]
pub fn id(&self) -> &TaskListId {
&self.id
}
#[must_use]
pub fn name(&self) -> &str {
self.name.get()
}
pub fn add_task(&mut self, task: TaskItem, peer_id: PeerId, seq: u64) -> Result<()> {
let task_id = *task.id();
let tag = (peer_id, seq);
self.tasks
.add(task_id, tag)
.map_err(|e| CrdtError::Merge(format!("Failed to add task to OR-Set: {}", e)))?;
if let Some(existing) = self.task_data.get_mut(&task_id) {
existing.merge(&task)?;
} else {
self.task_data.insert(task_id, task);
}
let mut current_order = self.ordering.get().clone();
if !current_order.contains(&task_id) {
current_order.push(task_id);
self.ordering.set(current_order, peer_id);
}
self.version += 1;
Ok(())
}
pub fn remove_task(&mut self, task_id: &TaskId) -> Result<()> {
if !self.task_data.contains_key(task_id) {
return Err(CrdtError::TaskNotFound(*task_id));
}
self.tasks
.remove(task_id)
.map_err(|e| CrdtError::Merge(format!("Failed to remove task from OR-Set: {}", e)))?;
self.task_data.remove(task_id);
self.version += 1;
Ok(())
}
pub fn claim_task(
&mut self,
task_id: &TaskId,
agent_id: AgentId,
peer_id: PeerId,
seq: u64,
) -> Result<()> {
let task = self
.task_data
.get_mut(task_id)
.ok_or(CrdtError::TaskNotFound(*task_id))?;
task.claim(agent_id, peer_id, seq)?;
self.version += 1;
Ok(())
}
pub fn complete_task(
&mut self,
task_id: &TaskId,
agent_id: AgentId,
peer_id: PeerId,
seq: u64,
) -> Result<()> {
let task = self
.task_data
.get_mut(task_id)
.ok_or(CrdtError::TaskNotFound(*task_id))?;
task.complete(agent_id, peer_id, seq)?;
self.version += 1;
Ok(())
}
pub fn reorder(&mut self, new_order: Vec<TaskId>, peer_id: PeerId) -> Result<()> {
for task_id in &new_order {
if !self.task_data.contains_key(task_id) {
return Err(CrdtError::TaskNotFound(*task_id));
}
}
self.ordering.set(new_order, peer_id);
self.version += 1;
Ok(())
}
#[must_use]
pub fn tasks_ordered(&self) -> Vec<&TaskItem> {
use std::collections::HashSet;
let current_order = self.ordering.get();
let or_set_tasks: HashSet<TaskId> = self.tasks.elements().into_iter().copied().collect();
let mut ordered: Vec<&TaskItem> = current_order
.iter()
.filter(|id| or_set_tasks.contains(id)) .filter_map(|id| self.task_data.get(id))
.collect();
for task_id in &or_set_tasks {
if !current_order.contains(task_id) {
if let Some(task) = self.task_data.get(task_id) {
ordered.push(task);
}
}
}
ordered
}
pub fn merge(&mut self, other: &TaskList) -> Result<()> {
if self.id != other.id {
return Err(CrdtError::Merge(format!(
"Cannot merge task lists with different IDs: {} != {}",
self.id, other.id
)));
}
self.tasks
.merge_state(&other.tasks)
.map_err(|e| CrdtError::Merge(format!("Failed to merge task OR-Sets: {}", e)))?;
for (task_id, other_task) in &other.task_data {
if let Some(our_task) = self.task_data.get_mut(task_id) {
our_task.merge(other_task)?;
} else {
self.task_data.insert(*task_id, other_task.clone());
}
}
self.ordering.merge(&other.ordering);
self.name.merge(&other.name);
Ok(())
}
pub fn update_name(&mut self, name: String, peer_id: PeerId) {
self.name.set(name, peer_id);
self.version += 1;
}
#[must_use]
pub fn task_count(&self) -> usize {
self.task_data.len()
}
#[must_use]
pub fn get_task(&self, task_id: &TaskId) -> Option<&TaskItem> {
self.task_data.get(task_id)
}
pub fn get_task_mut(&mut self, task_id: &TaskId) -> Option<&mut TaskItem> {
self.task_data.get_mut(task_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crdt::TaskMetadata;
fn agent(n: u8) -> AgentId {
AgentId([n; 32])
}
fn peer(n: u8) -> PeerId {
PeerId::new([n; 32])
}
fn list_id(n: u8) -> TaskListId {
TaskListId::new([n; 32])
}
fn make_task(id_byte: u8, peer: PeerId) -> TaskItem {
let agent = agent(1);
let task_id = TaskId::from_bytes([id_byte; 32]);
let metadata = TaskMetadata::new(
format!("Task {}", id_byte),
format!("Description {}", id_byte),
128,
agent,
1000,
);
TaskItem::new(task_id, metadata, peer)
}
#[test]
fn test_task_list_new() {
let peer = peer(1);
let id = list_id(1);
let list = TaskList::new(id, "My List".to_string(), peer);
assert_eq!(list.id(), &id);
assert_eq!(list.name(), "My List");
assert_eq!(list.task_count(), 0);
assert!(list.tasks_ordered().is_empty());
}
#[test]
fn test_add_task() {
let peer = peer(1);
let id = list_id(1);
let mut list = TaskList::new(id, "My List".to_string(), peer);
let task = make_task(1, peer);
let task_id = *task.id();
let result = list.add_task(task, peer, 1);
assert!(result.is_ok());
assert_eq!(list.task_count(), 1);
let tasks = list.tasks_ordered();
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].id(), &task_id);
}
#[test]
fn test_remove_task() {
let peer = peer(1);
let id = list_id(1);
let mut list = TaskList::new(id, "My List".to_string(), peer);
let task = make_task(1, peer);
let task_id = *task.id();
list.add_task(task, peer, 1).ok().unwrap();
assert_eq!(list.task_count(), 1);
let result = list.remove_task(&task_id);
assert!(result.is_ok());
assert_eq!(list.task_count(), 0);
}
#[test]
fn test_remove_nonexistent_task() {
let peer = peer(1);
let id = list_id(1);
let mut list = TaskList::new(id, "My List".to_string(), peer);
let task_id = TaskId::from_bytes([99; 32]);
let result = list.remove_task(&task_id);
assert!(result.is_err());
match result.unwrap_err() {
CrdtError::TaskNotFound(_) => {}
_ => panic!("Expected TaskNotFound"),
}
}
#[test]
fn test_claim_task() {
let peer = peer(1);
let agent = agent(1);
let id = list_id(1);
let mut list = TaskList::new(id, "My List".to_string(), peer);
let task = make_task(1, peer);
let task_id = *task.id();
list.add_task(task, peer, 1).ok().unwrap();
let result = list.claim_task(&task_id, agent, peer, 2);
assert!(result.is_ok());
let task = list.get_task(&task_id).unwrap();
assert!(task.current_state().is_claimed());
}
#[test]
fn test_complete_task() {
let peer = peer(1);
let agent = agent(1);
let id = list_id(1);
let mut list = TaskList::new(id, "My List".to_string(), peer);
let task = make_task(1, peer);
let task_id = *task.id();
list.add_task(task, peer, 1).ok().unwrap();
list.claim_task(&task_id, agent, peer, 2).ok().unwrap();
let result = list.complete_task(&task_id, agent, peer, 3);
assert!(result.is_ok());
let task = list.get_task(&task_id).unwrap();
assert!(task.current_state().is_done());
}
#[test]
fn test_reorder_tasks() {
let peer = peer(1);
let id = list_id(1);
let mut list = TaskList::new(id, "My List".to_string(), peer);
let task1 = make_task(1, peer);
let task2 = make_task(2, peer);
let task3 = make_task(3, peer);
let id1 = *task1.id();
let id2 = *task2.id();
let id3 = *task3.id();
list.add_task(task1, peer, 1).ok().unwrap();
list.add_task(task2, peer, 2).ok().unwrap();
list.add_task(task3, peer, 3).ok().unwrap();
let tasks = list.tasks_ordered();
assert_eq!(tasks.len(), 3);
assert_eq!(tasks[0].id(), &id1);
assert_eq!(tasks[1].id(), &id2);
assert_eq!(tasks[2].id(), &id3);
let new_order = vec![id3, id1, id2];
let result = list.reorder(new_order, peer);
assert!(result.is_ok());
let tasks = list.tasks_ordered();
assert_eq!(tasks[0].id(), &id3);
assert_eq!(tasks[1].id(), &id1);
assert_eq!(tasks[2].id(), &id2);
}
#[test]
fn test_reorder_with_invalid_task() {
let peer = peer(1);
let id = list_id(1);
let mut list = TaskList::new(id, "My List".to_string(), peer);
let task = make_task(1, peer);
let task_id = *task.id();
list.add_task(task, peer, 1).ok().unwrap();
let invalid_id = TaskId::from_bytes([99; 32]);
let new_order = vec![task_id, invalid_id];
let result = list.reorder(new_order, peer);
assert!(result.is_err());
match result.unwrap_err() {
CrdtError::TaskNotFound(_) => {}
_ => panic!("Expected TaskNotFound"),
}
}
#[test]
fn test_merge_task_lists() {
let peer1 = peer(1);
let peer2 = peer(2);
let id = list_id(1);
let mut list1 = TaskList::new(id, "List 1".to_string(), peer1);
let mut list2 = TaskList::new(id, "List 2".to_string(), peer2);
let task1 = make_task(1, peer1);
let id1 = *task1.id();
list1.add_task(task1, peer1, 1).ok().unwrap();
let task2 = make_task(2, peer2);
let id2 = *task2.id();
list2.add_task(task2, peer2, 1).ok().unwrap();
let result = list1.merge(&list2);
assert!(result.is_ok());
assert_eq!(list1.task_count(), 2);
assert!(list1.get_task(&id1).is_some());
assert!(list1.get_task(&id2).is_some());
}
#[test]
fn test_merge_with_concurrent_task_modifications() {
let peer1 = peer(1);
let peer2 = peer(2);
let agent1 = agent(1);
let _agent2 = agent(2);
let id = list_id(1);
let mut list1 = TaskList::new(id, "List".to_string(), peer1);
let mut list2 = TaskList::new(id, "List".to_string(), peer2);
let task1 = make_task(1, peer1);
let task2 = make_task(1, peer2);
let task_id = *task1.id();
list1.add_task(task1, peer1, 1).ok().unwrap();
list2.add_task(task2, peer2, 1).ok().unwrap();
list1.claim_task(&task_id, agent1, peer1, 2).ok().unwrap();
list2
.get_task_mut(&task_id)
.unwrap()
.update_title("Updated".to_string(), peer2);
list1.merge(&list2).ok().unwrap();
let task = list1.get_task(&task_id).unwrap();
assert!(task.current_state().is_claimed());
}
#[test]
fn test_merge_different_list_ids_fails() {
let peer = peer(1);
let id1 = list_id(1);
let id2 = list_id(2);
let mut list1 = TaskList::new(id1, "List 1".to_string(), peer);
let list2 = TaskList::new(id2, "List 2".to_string(), peer);
let result = list1.merge(&list2);
assert!(result.is_err());
match result.unwrap_err() {
CrdtError::Merge(_) => {}
_ => panic!("Expected Merge error"),
}
}
#[test]
fn test_tasks_ordered_with_removed_tasks() {
let peer = peer(1);
let id = list_id(1);
let mut list = TaskList::new(id, "My List".to_string(), peer);
let task1 = make_task(1, peer);
let task2 = make_task(2, peer);
let task3 = make_task(3, peer);
let id1 = *task1.id();
let id2 = *task2.id();
let id3 = *task3.id();
list.add_task(task1, peer, 1).ok().unwrap();
list.add_task(task2, peer, 2).ok().unwrap();
list.add_task(task3, peer, 3).ok().unwrap();
list.remove_task(&id2).ok().unwrap();
let tasks = list.tasks_ordered();
assert_eq!(tasks.len(), 2);
assert_eq!(tasks[0].id(), &id1);
assert_eq!(tasks[1].id(), &id3);
}
#[test]
fn test_update_name() {
let peer = peer(1);
let id = list_id(1);
let mut list = TaskList::new(id, "Original".to_string(), peer);
assert_eq!(list.name(), "Original");
list.update_name("Updated".to_string(), peer);
assert_eq!(list.name(), "Updated");
}
#[test]
fn test_task_list_id_from_content() {
let agent = agent(1);
let id1 = TaskListId::from_content("My List", &agent, 1000);
let id2 = TaskListId::from_content("My List", &agent, 1000);
assert_eq!(id1, id2);
let id3 = TaskListId::from_content("Different", &agent, 1000);
assert_ne!(id1, id3); }
#[test]
fn test_task_list_id_display() {
let id = TaskListId::new([42u8; 32]);
let display = format!("{}", id);
assert_eq!(display.len(), 64); assert!(display.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn test_serialization_roundtrip() {
let peer = peer(1);
let id = list_id(1);
let mut list = TaskList::new(id, "My List".to_string(), peer);
let task = make_task(1, peer);
list.add_task(task, peer, 1).ok().unwrap();
let serialized = bincode::serialize(&list).ok().unwrap();
let deserialized: TaskList = bincode::deserialize(&serialized).ok().unwrap();
assert_eq!(list.id(), deserialized.id());
assert_eq!(list.name(), deserialized.name());
assert_eq!(list.task_count(), deserialized.task_count());
}
#[test]
fn test_next_seq_starts_at_one() {
let list = TaskList::new(list_id(99), "seq test".to_string(), peer(1));
assert_eq!(list.next_seq(), 1);
}
#[test]
fn test_next_seq_is_strictly_monotonic() {
let list = TaskList::new(list_id(99), "seq test".to_string(), peer(1));
let mut prev = 0;
for _ in 0..100 {
let s = list.next_seq();
assert!(s > prev, "seq must be strictly increasing");
prev = s;
}
}
#[test]
fn test_next_seq_survives_clone() {
let list = TaskList::new(list_id(99), "seq test".to_string(), peer(1));
let s1 = list.next_seq();
let cloned = list.clone();
let s2 = cloned.next_seq();
assert_ne!(s1, s2, "clone must share counter via Arc");
assert_eq!(s2, s1 + 1);
}
#[test]
fn test_seq_counter_resets_after_serde() {
let list = TaskList::new(list_id(99), "seq test".to_string(), peer(1));
for _ in 0..10 {
let _ = list.next_seq();
}
let bytes = bincode::serialize(&list).ok().unwrap();
let restored: TaskList = bincode::deserialize(&bytes).ok().unwrap();
assert_eq!(
restored.next_seq(),
1,
"deserialized counter must start fresh"
);
}
#[test]
fn test_rapid_add_tasks_all_survive() {
let p = peer(1);
let mut list = TaskList::new(list_id(99), "rapid".to_string(), p);
for i in 0u8..50 {
let task = make_task(i, p);
let seq = list.next_seq();
list.add_task(task, p, seq).ok().unwrap();
}
assert_eq!(
list.task_count(),
50,
"all 50 tasks must survive; duplicate OR-Set tags would drop some"
);
}
}