use crate::crdt::{Result, TaskList, TaskListDelta};
use crate::gossip::PubSubManager;
use saorsa_gossip_crdt_sync::AntiEntropyManager;
use saorsa_gossip_types::PeerId;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct TaskListSync {
task_list: Arc<RwLock<TaskList>>,
#[allow(dead_code)]
anti_entropy: AntiEntropyManager<TaskList>,
pubsub: Arc<PubSubManager>,
topic: String,
}
impl TaskListSync {
pub fn new(
task_list: TaskList,
pubsub: Arc<PubSubManager>,
topic: String,
sync_interval_secs: u64,
) -> Result<Self> {
let task_list = Arc::new(RwLock::new(task_list));
let anti_entropy = AntiEntropyManager::new(Arc::clone(&task_list), sync_interval_secs);
Ok(Self {
task_list,
anti_entropy,
pubsub,
topic,
})
}
pub async fn start(&self) -> Result<()> {
let mut sub = self.pubsub.subscribe(self.topic.clone()).await;
let task_list = Arc::clone(&self.task_list);
tokio::spawn(async move {
while let Some(msg) = sub.recv().await {
let decoded = {
use bincode::Options;
bincode::options()
.with_fixint_encoding()
.with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
.allow_trailing_bytes()
.deserialize::<(PeerId, TaskListDelta)>(&msg.payload)
};
match decoded {
Ok((peer_id, delta)) => {
let mut list = task_list.write().await;
if let Err(e) = list.merge_delta(&delta, peer_id) {
tracing::warn!("Failed to merge remote delta: {}", e);
}
}
Err(e) => {
tracing::warn!("Failed to deserialize delta from topic: {}", e);
}
}
}
});
Ok(())
}
pub async fn stop(&self) -> Result<()> {
self.pubsub.unsubscribe(&self.topic).await;
Ok(())
}
pub async fn apply_remote_delta(&self, peer_id: PeerId, delta: TaskListDelta) -> Result<()> {
let mut task_list = self.task_list.write().await;
task_list.merge_delta(&delta, peer_id)?;
Ok(())
}
pub async fn publish_delta(&self, local_peer_id: PeerId, delta: TaskListDelta) -> Result<()> {
let serialized = bincode::serialize(&(local_peer_id, delta)).map_err(|e| {
crate::crdt::CrdtError::Gossip(format!("failed to serialize delta: {e}"))
})?;
self.pubsub
.publish(self.topic.clone(), bytes::Bytes::from(serialized))
.await
.map_err(|e| crate::crdt::CrdtError::Gossip(format!("failed to publish delta: {e}")))?;
Ok(())
}
pub async fn read(&self) -> tokio::sync::RwLockReadGuard<'_, TaskList> {
self.task_list.read().await
}
pub async fn write(&self) -> tokio::sync::RwLockWriteGuard<'_, TaskList> {
self.task_list.write().await
}
#[must_use]
pub fn topic(&self) -> &str {
&self.topic
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crdt::{TaskId, TaskItem, TaskListId, TaskMetadata};
use crate::identity::AgentId;
fn agent(n: u8) -> AgentId {
AgentId([n; 32])
}
fn peer(n: u8) -> PeerId {
PeerId::new([n; 32])
}
fn list_id(n: u8) -> TaskListId {
TaskListId::new([n; 32])
}
fn make_task(id_byte: u8, peer: PeerId) -> TaskItem {
let agent = agent(1);
let task_id = TaskId::from_bytes([id_byte; 32]);
let metadata = TaskMetadata::new(
format!("Task {}", id_byte),
format!("Description {}", id_byte),
128,
agent,
1000,
);
TaskItem::new(task_id, metadata, peer)
}
#[tokio::test]
async fn test_task_list_sync_creation() {
let peer = peer(1);
let id = list_id(1);
let task_list = TaskList::new(id, "Test List".to_string(), peer);
let _list_for_sync = task_list;
}
#[tokio::test]
async fn test_apply_delta() {
let peer1 = peer(1);
let peer2 = peer(2);
let id = list_id(1);
let task_list = TaskList::new(id, "Test".to_string(), peer1);
let task_list_arc = Arc::new(RwLock::new(task_list));
let mut delta = TaskListDelta::new(1);
let task = make_task(1, peer2);
let task_id = *task.id();
let tag = (peer2, 1);
delta.added_tasks.insert(task_id, (task, tag));
{
let mut list = task_list_arc.write().await;
let result = list.merge_delta(&delta, peer2);
assert!(result.is_ok());
}
{
let list = task_list_arc.read().await;
assert_eq!(list.task_count(), 1);
}
}
#[tokio::test]
async fn test_concurrent_access() {
let peer = peer(1);
let id = list_id(1);
let task_list = TaskList::new(id, "Test".to_string(), peer);
let task_list_arc = Arc::new(RwLock::new(task_list));
let list1 = task_list_arc.read().await;
let list2 = task_list_arc.read().await;
assert_eq!(list1.name(), "Test");
assert_eq!(list2.name(), "Test");
drop(list1);
drop(list2);
{
let mut list = task_list_arc.write().await;
list.update_name("Updated".to_string(), peer);
}
let list = task_list_arc.read().await;
assert_eq!(list.name(), "Updated");
}
}