use crate::crdt::{CheckboxState, CrdtError, Result, TaskId, TaskMetadata};
use crate::identity::AgentId;
use saorsa_gossip_crdt_sync::{LwwRegister, OrSet};
use saorsa_gossip_types::PeerId;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskItem {
id: TaskId,
checkbox: OrSet<CheckboxState>,
title: LwwRegister<String>,
description: LwwRegister<String>,
assignee: LwwRegister<Option<AgentId>>,
priority: LwwRegister<u8>,
created_by: AgentId,
created_at: u64,
}
impl TaskItem {
#[must_use]
pub fn new(id: TaskId, metadata: TaskMetadata, _peer_id: PeerId) -> Self {
Self {
id,
checkbox: OrSet::new(),
title: LwwRegister::new(metadata.title),
description: LwwRegister::new(metadata.description),
assignee: LwwRegister::new(None),
priority: LwwRegister::new(metadata.priority),
created_by: metadata.created_by,
created_at: metadata.created_at,
}
}
#[must_use]
pub fn id(&self) -> &TaskId {
&self.id
}
#[must_use]
pub fn created_by(&self) -> &AgentId {
&self.created_by
}
#[must_use]
pub fn created_at(&self) -> u64 {
self.created_at
}
#[must_use]
pub fn title(&self) -> &str {
self.title.get()
}
#[must_use]
pub fn description(&self) -> &str {
self.description.get()
}
#[must_use]
pub fn assignee(&self) -> Option<&AgentId> {
self.assignee.get().as_ref()
}
#[must_use]
pub fn priority(&self) -> u8 {
*self.priority.get()
}
pub fn claim(&mut self, agent_id: AgentId, peer_id: PeerId, seq: u64) -> Result<()> {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| CrdtError::SystemClock(format!("clock before Unix epoch: {e}")))?
.as_millis() as u64;
let current = self.current_state();
if current.is_done() {
return Err(CrdtError::InvalidStateTransition {
current,
attempted: CheckboxState::Claimed {
agent_id,
timestamp,
},
});
}
let claimed_state = CheckboxState::Claimed {
agent_id,
timestamp, };
let tag = (peer_id, seq); self.checkbox
.add(claimed_state, tag)
.map_err(|e| CrdtError::Merge(format!("Failed to add claimed state: {}", e)))?;
Ok(())
}
pub fn complete(&mut self, agent_id: AgentId, peer_id: PeerId, seq: u64) -> Result<()> {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| CrdtError::SystemClock(format!("clock before Unix epoch: {e}")))?
.as_millis() as u64;
let current = self.current_state();
if current.is_empty() {
return Err(CrdtError::InvalidStateTransition {
current,
attempted: CheckboxState::Done {
agent_id,
timestamp,
},
});
}
if current.is_done() {
return Err(CrdtError::InvalidStateTransition {
current,
attempted: CheckboxState::Done {
agent_id,
timestamp,
},
});
}
let done_state = CheckboxState::Done {
agent_id,
timestamp, };
let tag = (peer_id, seq); self.checkbox
.add(done_state, tag)
.map_err(|e| CrdtError::Merge(format!("Failed to add done state: {}", e)))?;
Ok(())
}
pub fn update_title(&mut self, title: String, peer_id: PeerId) {
self.title.set(title, peer_id);
}
pub fn update_description(&mut self, description: String, peer_id: PeerId) {
self.description.set(description, peer_id);
}
pub fn update_assignee(&mut self, assignee: Option<AgentId>, peer_id: PeerId) {
self.assignee.set(assignee, peer_id);
}
pub fn update_priority(&mut self, priority: u8, peer_id: PeerId) {
self.priority.set(priority, peer_id);
}
#[must_use]
pub fn current_state(&self) -> CheckboxState {
let states = self.checkbox.elements();
if states.is_empty() {
return CheckboxState::Empty;
}
let done_states: Vec<_> = states.iter().filter(|s| s.is_done()).collect();
if !done_states.is_empty() {
return done_states
.into_iter()
.min()
.map(|s| (*s).clone())
.unwrap_or(CheckboxState::Empty);
}
let claimed_states: Vec<_> = states.iter().filter(|s| s.is_claimed()).collect();
if !claimed_states.is_empty() {
return claimed_states
.into_iter()
.min()
.map(|s| (*s).clone())
.unwrap_or(CheckboxState::Empty);
}
CheckboxState::Empty
}
pub fn merge(&mut self, other: &TaskItem) -> Result<()> {
if self.id != other.id {
return Err(CrdtError::Merge(format!(
"Cannot merge tasks with different IDs: {} != {}",
self.id, other.id
)));
}
self.checkbox
.merge_state(&other.checkbox)
.map_err(|e| CrdtError::Merge(format!("Failed to merge checkbox states: {}", e)))?;
self.title.merge(&other.title);
self.description.merge(&other.description);
self.assignee.merge(&other.assignee);
self.priority.merge(&other.priority);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn agent(n: u8) -> AgentId {
AgentId([n; 32])
}
fn peer(n: u8) -> PeerId {
PeerId::new([n; 32])
}
fn make_task(peer: PeerId) -> TaskItem {
let agent = agent(1);
let task_id = TaskId::new("Test task", &agent, 1000);
let metadata = TaskMetadata::new("Test", "Description", 128, agent, 1000);
TaskItem::new(task_id, metadata, peer)
}
#[test]
fn test_task_item_new() {
let peer = peer(1);
let agent = agent(1);
let task_id = TaskId::new("Task", &agent, 1000);
let metadata = TaskMetadata::new("Title", "Desc", 200, agent, 1234567890);
let task = TaskItem::new(task_id, metadata.clone(), peer);
assert_eq!(task.id(), &task_id);
assert_eq!(task.title(), "Title");
assert_eq!(task.description(), "Desc");
assert_eq!(task.priority(), 200);
assert_eq!(task.created_by(), &agent);
assert_eq!(task.created_at(), 1234567890);
assert_eq!(task.assignee(), None);
assert!(task.current_state().is_empty());
}
#[test]
fn test_claim_from_empty() {
let peer = peer(1);
let agent = agent(1);
let mut task = make_task(peer);
let result = task.claim(agent, peer, 1);
assert!(result.is_ok());
assert!(task.current_state().is_claimed());
assert_eq!(task.current_state().claimed_by(), Some(&agent));
}
#[test]
fn test_cannot_claim_done_task() {
let peer = peer(1);
let agent = agent(1);
let mut task = make_task(peer);
task.claim(agent, peer, 1).ok().unwrap();
task.complete(agent, peer, 2).ok().unwrap();
let result = task.claim(agent, peer, 3);
assert!(result.is_err());
match result.unwrap_err() {
CrdtError::InvalidStateTransition { .. } => {}
_ => panic!("Expected InvalidStateTransition"),
}
}
#[test]
fn test_complete_from_claimed() {
let peer = peer(1);
let agent = agent(1);
let mut task = make_task(peer);
task.claim(agent, peer, 1).ok().unwrap();
let result = task.complete(agent, peer, 2);
assert!(result.is_ok());
assert!(task.current_state().is_done());
}
#[test]
fn test_cannot_complete_empty_task() {
let peer = peer(1);
let agent = agent(1);
let mut task = make_task(peer);
let result = task.complete(agent, peer, 1);
assert!(result.is_err());
match result.unwrap_err() {
CrdtError::InvalidStateTransition { .. } => {}
_ => panic!("Expected InvalidStateTransition"),
}
}
#[test]
fn test_cannot_complete_done_task() {
let peer = peer(1);
let agent = agent(1);
let mut task = make_task(peer);
task.claim(agent, peer, 1).ok().unwrap();
task.complete(agent, peer, 2).ok().unwrap();
let result = task.complete(agent, peer, 3);
assert!(result.is_err());
match result.unwrap_err() {
CrdtError::InvalidStateTransition { .. } => {}
_ => panic!("Expected InvalidStateTransition"),
}
}
#[test]
fn test_concurrent_claims() {
let peer1 = peer(1);
let peer2 = peer(2);
let agent1 = agent(1);
let agent2 = agent(2);
let mut task1 = make_task(peer1);
let mut task2 = make_task(peer1);
task1.claim(agent1, peer1, 100).ok().unwrap();
task2.claim(agent2, peer2, 200).ok().unwrap();
task1.merge(&task2).ok().unwrap();
let state = task1.current_state();
assert!(state.is_claimed());
assert!(state.claimed_by().is_some());
assert!(state.timestamp().unwrap() > 1_000_000_000_000); }
#[test]
fn test_concurrent_completes() {
let peer1 = peer(1);
let peer2 = peer(2);
let agent1 = agent(1);
let agent2 = agent(2);
let mut task1 = make_task(peer1);
let mut task2 = make_task(peer1);
task1.claim(agent1, peer1, 50).ok().unwrap();
task2.claim(agent1, peer1, 50).ok().unwrap();
task1.complete(agent1, peer1, 100).ok().unwrap();
task2.complete(agent2, peer2, 200).ok().unwrap();
task1.merge(&task2).ok().unwrap();
let state = task1.current_state();
assert!(state.is_done());
assert!(state.claimed_by().is_some());
assert!(state.timestamp().unwrap() > 1_000_000_000_000); }
#[test]
fn test_update_title() {
let peer = peer(1);
let mut task = make_task(peer);
assert_eq!(task.title(), "Test");
task.update_title("New Title".to_string(), peer);
assert_eq!(task.title(), "New Title");
}
#[test]
fn test_update_description() {
let peer = peer(1);
let mut task = make_task(peer);
assert_eq!(task.description(), "Description");
task.update_description("New Description".to_string(), peer);
assert_eq!(task.description(), "New Description");
}
#[test]
fn test_update_assignee() {
let peer = peer(1);
let agent = agent(42);
let mut task = make_task(peer);
assert_eq!(task.assignee(), None);
task.update_assignee(Some(agent), peer);
assert_eq!(task.assignee(), Some(&agent));
task.update_assignee(None, peer);
assert_eq!(task.assignee(), None);
}
#[test]
fn test_update_priority() {
let peer = peer(1);
let mut task = make_task(peer);
assert_eq!(task.priority(), 128);
task.update_priority(255, peer);
assert_eq!(task.priority(), 255);
}
#[test]
fn test_metadata_lww_semantics() {
let peer1 = peer(1);
let peer2 = peer(2);
let mut task1 = make_task(peer1);
let mut task2 = make_task(peer1);
task1.update_title("Title from peer1".to_string(), peer1);
task2.update_title("Title from peer2".to_string(), peer2);
task1.merge(&task2).ok().unwrap();
assert!(
task1.title() == "Title from peer1" || task1.title() == "Title from peer2",
"LWW should pick one of the concurrent updates"
);
}
#[test]
fn test_merge_is_idempotent() {
let peer = peer(1);
let agent = agent(1);
let mut task1 = make_task(peer);
let mut task2 = make_task(peer);
task1.claim(agent, peer, 100).ok().unwrap();
task1.update_title("Title".to_string(), peer);
task2.merge(&task1).ok().unwrap();
let state_after_first = task2.current_state();
let title_after_first = task2.title().to_string();
task2.merge(&task1).ok().unwrap();
let state_after_second = task2.current_state();
let title_after_second = task2.title().to_string();
assert_eq!(state_after_first, state_after_second);
assert_eq!(title_after_first, title_after_second);
}
#[test]
fn test_merge_is_commutative() {
let peer1 = peer(1);
let peer2 = peer(2);
let agent1 = agent(1);
let _agent2 = agent(2);
let mut task_a = make_task(peer1);
let mut task_b = make_task(peer1);
task_a.claim(agent1, peer1, 100).ok().unwrap();
task_b.update_title("New Title".to_string(), peer2);
let mut result1 = task_a.clone();
result1.merge(&task_b).ok().unwrap();
let mut result2 = task_b.clone();
result2.merge(&task_a).ok().unwrap();
assert_eq!(result1.current_state(), result2.current_state());
assert_eq!(result1.title(), result2.title());
}
#[test]
fn test_merge_different_task_ids_fails() {
let peer = peer(1);
let agent1 = agent(1);
let agent2 = agent(2);
let task_id1 = TaskId::new("Task 1", &agent1, 1000);
let task_id2 = TaskId::new("Task 2", &agent2, 2000);
let metadata1 = TaskMetadata::new("Task 1", "Desc", 128, agent1, 1000);
let metadata2 = TaskMetadata::new("Task 2", "Desc", 128, agent2, 2000);
let mut task1 = TaskItem::new(task_id1, metadata1, peer);
let task2 = TaskItem::new(task_id2, metadata2, peer);
let result = task1.merge(&task2);
assert!(result.is_err());
match result.unwrap_err() {
CrdtError::Merge(_) => {}
_ => panic!("Expected Merge error"),
}
}
#[test]
fn test_serialization_roundtrip() {
let peer = peer(1);
let agent = agent(42);
let mut task = make_task(peer);
task.claim(agent, peer, 100).ok().unwrap();
task.update_title("Serialized Task".to_string(), peer);
task.update_priority(200, peer);
let serialized = bincode::serialize(&task).ok().unwrap();
let deserialized: TaskItem = bincode::deserialize(&serialized).ok().unwrap();
assert_eq!(task.id(), deserialized.id());
assert_eq!(task.title(), deserialized.title());
assert_eq!(task.priority(), deserialized.priority());
assert_eq!(task.current_state(), deserialized.current_state());
}
}