Skip to main content

x0x/crdt/
sync.rs

1//! Task list synchronization using anti-entropy gossip.
2//!
3//! This module provides automatic synchronization of TaskLists across peers
4//! using saorsa-gossip's anti-entropy mechanism combined with pub/sub.
5//!
6//! ## Architecture
7//!
8//! - `TaskListSync` wraps a TaskList in Arc<RwLock<>> for concurrent access
9//! - Uses `AntiEntropyManager` for periodic background synchronization
10//! - Publishes deltas to a gossip topic when local changes occur
11//! - Subscribes to the topic to receive and apply remote deltas
12//!
13//! This provides eventual consistency across all peers sharing the same topic.
14
15use crate::crdt::{Result, TaskList, TaskListDelta};
16use crate::gossip::PubSubManager;
17use saorsa_gossip_crdt_sync::AntiEntropyManager;
18use saorsa_gossip_types::PeerId;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22/// Synchronization wrapper for a TaskList.
23///
24/// Manages automatic background synchronization of a TaskList using
25/// anti-entropy gossip. Changes are propagated via deltas published
26/// to a gossip topic.
27pub struct TaskListSync {
28    /// The task list being synchronized (wrapped for concurrent access).
29    task_list: Arc<RwLock<TaskList>>,
30
31    /// Anti-entropy manager for periodic sync.
32    #[allow(dead_code)]
33    anti_entropy: AntiEntropyManager<TaskList>,
34
35    /// Pub/sub manager for topic-based messaging.
36    pubsub: Arc<PubSubManager>,
37
38    /// Topic name for this task list.
39    topic: String,
40}
41
42impl TaskListSync {
43    /// Create a new TaskList synchronization manager.
44    ///
45    /// # Arguments
46    ///
47    /// * `task_list` - The TaskList to synchronize
48    /// * `pubsub` - Pub/sub manager for gossip messaging
49    /// * `topic` - Topic name for pub/sub (typically task list ID)
50    /// * `sync_interval_secs` - How often to run anti-entropy (seconds)
51    ///
52    /// # Returns
53    ///
54    /// A new TaskListSync instance ready to start.
55    ///
56    /// # Errors
57    ///
58    /// Returns an error if initialization fails.
59    ///
60    /// # Example
61    ///
62    /// ```ignore
63    /// let task_list = TaskList::new(id, "My List".to_string(), peer_id);
64    /// let sync = TaskListSync::new(
65    ///     task_list,
66    ///     pubsub,
67    ///     "tasklist-abc123".to_string(),
68    ///     30, // Sync every 30 seconds
69    /// )?;
70    /// ```
71    pub fn new(
72        task_list: TaskList,
73        pubsub: Arc<PubSubManager>,
74        topic: String,
75        sync_interval_secs: u64,
76    ) -> Result<Self> {
77        // Wrap task list for concurrent access
78        let task_list = Arc::new(RwLock::new(task_list));
79
80        // Create anti-entropy manager
81        let anti_entropy = AntiEntropyManager::new(Arc::clone(&task_list), sync_interval_secs);
82
83        Ok(Self {
84            task_list,
85            anti_entropy,
86            pubsub,
87            topic,
88        })
89    }
90
91    /// Start background synchronization.
92    ///
93    /// Subscribes to the gossip topic and begins receiving remote deltas.
94    /// This method returns immediately; synchronization runs in the background.
95    ///
96    /// # Returns
97    ///
98    /// Ok(()) if started successfully.
99    ///
100    /// # Errors
101    ///
102    /// Returns an error if subscription or anti-entropy startup fails.
103    pub async fn start(&self) -> Result<()> {
104        // Subscribe to topic — received messages will contain serialized deltas.
105        // The background task applies them via apply_remote_delta.
106        let mut sub = self.pubsub.subscribe(self.topic.clone()).await;
107        let task_list = Arc::clone(&self.task_list);
108
109        tokio::spawn(async move {
110            while let Some(msg) = sub.recv().await {
111                let decoded = {
112                    use bincode::Options;
113                    bincode::options()
114                        .with_fixint_encoding()
115                        .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
116                        .allow_trailing_bytes()
117                        .deserialize::<(PeerId, TaskListDelta)>(&msg.payload)
118                };
119                match decoded {
120                    Ok((peer_id, delta)) => {
121                        let mut list = task_list.write().await;
122                        if let Err(e) = list.merge_delta(&delta, peer_id) {
123                            tracing::warn!("Failed to merge remote delta: {}", e);
124                        }
125                    }
126                    Err(e) => {
127                        tracing::warn!("Failed to deserialize delta from topic: {}", e);
128                    }
129                }
130            }
131        });
132
133        Ok(())
134    }
135
136    /// Stop background synchronization.
137    ///
138    /// Unsubscribes from the gossip topic.
139    ///
140    /// # Returns
141    ///
142    /// Ok(()) if stopped successfully.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if operations fail.
147    pub async fn stop(&self) -> Result<()> {
148        self.pubsub.unsubscribe(&self.topic).await;
149        Ok(())
150    }
151
152    /// Apply a delta received from a remote peer.
153    ///
154    /// This is called when a delta is received via the gossip topic.
155    /// The delta is merged into the local TaskList using CRDT semantics.
156    ///
157    /// # Arguments
158    ///
159    /// * `peer_id` - The peer who sent this delta
160    /// * `delta` - The delta to apply
161    ///
162    /// # Returns
163    ///
164    /// Ok(()) if the delta was applied successfully.
165    ///
166    /// # Errors
167    ///
168    /// Returns an error if the merge fails.
169    pub async fn apply_remote_delta(&self, peer_id: PeerId, delta: TaskListDelta) -> Result<()> {
170        let mut task_list = self.task_list.write().await;
171        task_list.merge_delta(&delta, peer_id)?;
172        Ok(())
173    }
174
175    /// Publish a local delta to the gossip network.
176    ///
177    /// Call this after making local changes to propagate them to other peers.
178    ///
179    /// # Arguments
180    ///
181    /// * `local_peer_id` - The local peer's ID
182    /// * `delta` - The delta to publish
183    ///
184    /// # Returns
185    ///
186    /// Ok(()) if published successfully.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if serialization or publishing fails.
191    pub async fn publish_delta(&self, local_peer_id: PeerId, delta: TaskListDelta) -> Result<()> {
192        let serialized = bincode::serialize(&(local_peer_id, delta)).map_err(|e| {
193            crate::crdt::CrdtError::Gossip(format!("failed to serialize delta: {e}"))
194        })?;
195
196        self.pubsub
197            .publish(self.topic.clone(), bytes::Bytes::from(serialized))
198            .await
199            .map_err(|e| crate::crdt::CrdtError::Gossip(format!("failed to publish delta: {e}")))?;
200
201        Ok(())
202    }
203
204    /// Get a read-only reference to the task list.
205    ///
206    /// Useful for querying the current state without modifying it.
207    ///
208    /// # Returns
209    ///
210    /// A read guard to the TaskList.
211    pub async fn read(&self) -> tokio::sync::RwLockReadGuard<'_, TaskList> {
212        self.task_list.read().await
213    }
214
215    /// Get a mutable reference to the task list.
216    ///
217    /// Use this to make local changes. After modifying, call `publish_delta`
218    /// to propagate changes to peers.
219    ///
220    /// # Returns
221    ///
222    /// A write guard to the TaskList.
223    pub async fn write(&self) -> tokio::sync::RwLockWriteGuard<'_, TaskList> {
224        self.task_list.write().await
225    }
226
227    /// Get the topic name for this task list.
228    #[must_use]
229    pub fn topic(&self) -> &str {
230        &self.topic
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use crate::crdt::{TaskId, TaskItem, TaskListId, TaskMetadata};
238    use crate::identity::AgentId;
239
240    fn agent(n: u8) -> AgentId {
241        AgentId([n; 32])
242    }
243
244    fn peer(n: u8) -> PeerId {
245        PeerId::new([n; 32])
246    }
247
248    fn list_id(n: u8) -> TaskListId {
249        TaskListId::new([n; 32])
250    }
251
252    fn make_task(id_byte: u8, peer: PeerId) -> TaskItem {
253        let agent = agent(1);
254        let task_id = TaskId::from_bytes([id_byte; 32]);
255        let metadata = TaskMetadata::new(
256            format!("Task {}", id_byte),
257            format!("Description {}", id_byte),
258            128,
259            agent,
260            1000,
261        );
262        TaskItem::new(task_id, metadata, peer)
263    }
264
265    #[tokio::test]
266    async fn test_task_list_sync_creation() {
267        let peer = peer(1);
268        let id = list_id(1);
269        let task_list = TaskList::new(id, "Test List".to_string(), peer);
270
271        // We cannot create a real PubSubManager in a unit test without a NetworkNode
272        // For now, we just verify the types are correct
273        let _list_for_sync = task_list;
274    }
275
276    #[tokio::test]
277    async fn test_apply_delta() {
278        // Create a task list
279        let peer1 = peer(1);
280        let peer2 = peer(2);
281        let id = list_id(1);
282        let task_list = TaskList::new(id, "Test".to_string(), peer1);
283
284        // Wrap in Arc<RwLock<>>
285        let task_list_arc = Arc::new(RwLock::new(task_list));
286
287        // Create a delta with a new task
288        let mut delta = TaskListDelta::new(1);
289        let task = make_task(1, peer2);
290        let task_id = *task.id();
291        let tag = (peer2, 1);
292        delta.added_tasks.insert(task_id, (task, tag));
293
294        // Apply delta directly (simulating what TaskListSync::apply_remote_delta does)
295        {
296            let mut list = task_list_arc.write().await;
297            let result = list.merge_delta(&delta, peer2);
298            assert!(result.is_ok());
299        }
300
301        // Verify task was added
302        {
303            let list = task_list_arc.read().await;
304            assert_eq!(list.task_count(), 1);
305        }
306    }
307
308    #[tokio::test]
309    async fn test_concurrent_access() {
310        // Test that RwLock allows multiple readers
311        let peer = peer(1);
312        let id = list_id(1);
313        let task_list = TaskList::new(id, "Test".to_string(), peer);
314        let task_list_arc = Arc::new(RwLock::new(task_list));
315
316        // Multiple concurrent reads should work
317        let list1 = task_list_arc.read().await;
318        let list2 = task_list_arc.read().await;
319
320        assert_eq!(list1.name(), "Test");
321        assert_eq!(list2.name(), "Test");
322
323        drop(list1);
324        drop(list2);
325
326        // Write should work after readers drop
327        {
328            let mut list = task_list_arc.write().await;
329            list.update_name("Updated".to_string(), peer);
330        }
331
332        // Verify update
333        let list = task_list_arc.read().await;
334        assert_eq!(list.name(), "Updated");
335    }
336}