x0x/crdt/sync.rs
1//! Task list synchronization using anti-entropy gossip.
2//!
3//! This module provides automatic synchronization of TaskLists across peers
4//! using saorsa-gossip's anti-entropy mechanism combined with pub/sub.
5//!
6//! ## Architecture
7//!
8//! - `TaskListSync` wraps a TaskList in Arc<RwLock<>> for concurrent access
9//! - Uses `AntiEntropyManager` for periodic background synchronization
10//! - Publishes deltas to a gossip topic when local changes occur
11//! - Subscribes to the topic to receive and apply remote deltas
12//!
13//! This provides eventual consistency across all peers sharing the same topic.
14
15use crate::crdt::{Result, TaskList, TaskListDelta};
16use crate::gossip::PubSubManager;
17use saorsa_gossip_crdt_sync::AntiEntropyManager;
18use saorsa_gossip_types::PeerId;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22/// Synchronization wrapper for a TaskList.
23///
24/// Manages automatic background synchronization of a TaskList using
25/// anti-entropy gossip. Changes are propagated via deltas published
26/// to a gossip topic.
27pub struct TaskListSync {
28 /// The task list being synchronized (wrapped for concurrent access).
29 task_list: Arc<RwLock<TaskList>>,
30
31 /// Anti-entropy manager for periodic sync.
32 #[allow(dead_code)]
33 anti_entropy: AntiEntropyManager<TaskList>,
34
35 /// Pub/sub manager for topic-based messaging.
36 pubsub: Arc<PubSubManager>,
37
38 /// Topic name for this task list.
39 topic: String,
40}
41
42impl TaskListSync {
43 /// Create a new TaskList synchronization manager.
44 ///
45 /// # Arguments
46 ///
47 /// * `task_list` - The TaskList to synchronize
48 /// * `pubsub` - Pub/sub manager for gossip messaging
49 /// * `topic` - Topic name for pub/sub (typically task list ID)
50 /// * `sync_interval_secs` - How often to run anti-entropy (seconds)
51 ///
52 /// # Returns
53 ///
54 /// A new TaskListSync instance ready to start.
55 ///
56 /// # Errors
57 ///
58 /// Returns an error if initialization fails.
59 ///
60 /// # Example
61 ///
62 /// ```ignore
63 /// let task_list = TaskList::new(id, "My List".to_string(), peer_id);
64 /// let sync = TaskListSync::new(
65 /// task_list,
66 /// pubsub,
67 /// "tasklist-abc123".to_string(),
68 /// 30, // Sync every 30 seconds
69 /// )?;
70 /// ```
71 pub fn new(
72 task_list: TaskList,
73 pubsub: Arc<PubSubManager>,
74 topic: String,
75 sync_interval_secs: u64,
76 ) -> Result<Self> {
77 // Wrap task list for concurrent access
78 let task_list = Arc::new(RwLock::new(task_list));
79
80 // Create anti-entropy manager
81 let anti_entropy = AntiEntropyManager::new(Arc::clone(&task_list), sync_interval_secs);
82
83 Ok(Self {
84 task_list,
85 anti_entropy,
86 pubsub,
87 topic,
88 })
89 }
90
91 /// Start background synchronization.
92 ///
93 /// Subscribes to the gossip topic and begins receiving remote deltas.
94 /// This method returns immediately; synchronization runs in the background.
95 ///
96 /// # Returns
97 ///
98 /// Ok(()) if started successfully.
99 ///
100 /// # Errors
101 ///
102 /// Returns an error if subscription or anti-entropy startup fails.
103 pub async fn start(&self) -> Result<()> {
104 // Subscribe to topic — received messages will contain serialized deltas.
105 // The background task applies them via apply_remote_delta.
106 let mut sub = self.pubsub.subscribe(self.topic.clone()).await;
107 let task_list = Arc::clone(&self.task_list);
108
109 tokio::spawn(async move {
110 while let Some(msg) = sub.recv().await {
111 let decoded = {
112 use bincode::Options;
113 bincode::options()
114 .with_fixint_encoding()
115 .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
116 .allow_trailing_bytes()
117 .deserialize::<(PeerId, TaskListDelta)>(&msg.payload)
118 };
119 match decoded {
120 Ok((peer_id, delta)) => {
121 let mut list = task_list.write().await;
122 if let Err(e) = list.merge_delta(&delta, peer_id) {
123 tracing::warn!("Failed to merge remote delta: {}", e);
124 }
125 }
126 Err(e) => {
127 tracing::warn!("Failed to deserialize delta from topic: {}", e);
128 }
129 }
130 }
131 });
132
133 Ok(())
134 }
135
136 /// Stop background synchronization.
137 ///
138 /// Unsubscribes from the gossip topic.
139 ///
140 /// # Returns
141 ///
142 /// Ok(()) if stopped successfully.
143 ///
144 /// # Errors
145 ///
146 /// Returns an error if operations fail.
147 pub async fn stop(&self) -> Result<()> {
148 self.pubsub.unsubscribe(&self.topic).await;
149 Ok(())
150 }
151
152 /// Apply a delta received from a remote peer.
153 ///
154 /// This is called when a delta is received via the gossip topic.
155 /// The delta is merged into the local TaskList using CRDT semantics.
156 ///
157 /// # Arguments
158 ///
159 /// * `peer_id` - The peer who sent this delta
160 /// * `delta` - The delta to apply
161 ///
162 /// # Returns
163 ///
164 /// Ok(()) if the delta was applied successfully.
165 ///
166 /// # Errors
167 ///
168 /// Returns an error if the merge fails.
169 pub async fn apply_remote_delta(&self, peer_id: PeerId, delta: TaskListDelta) -> Result<()> {
170 let mut task_list = self.task_list.write().await;
171 task_list.merge_delta(&delta, peer_id)?;
172 Ok(())
173 }
174
175 /// Publish a local delta to the gossip network.
176 ///
177 /// Call this after making local changes to propagate them to other peers.
178 ///
179 /// # Arguments
180 ///
181 /// * `local_peer_id` - The local peer's ID
182 /// * `delta` - The delta to publish
183 ///
184 /// # Returns
185 ///
186 /// Ok(()) if published successfully.
187 ///
188 /// # Errors
189 ///
190 /// Returns an error if serialization or publishing fails.
191 pub async fn publish_delta(&self, local_peer_id: PeerId, delta: TaskListDelta) -> Result<()> {
192 let serialized = bincode::serialize(&(local_peer_id, delta)).map_err(|e| {
193 crate::crdt::CrdtError::Gossip(format!("failed to serialize delta: {e}"))
194 })?;
195
196 self.pubsub
197 .publish(self.topic.clone(), bytes::Bytes::from(serialized))
198 .await
199 .map_err(|e| crate::crdt::CrdtError::Gossip(format!("failed to publish delta: {e}")))?;
200
201 Ok(())
202 }
203
204 /// Get a read-only reference to the task list.
205 ///
206 /// Useful for querying the current state without modifying it.
207 ///
208 /// # Returns
209 ///
210 /// A read guard to the TaskList.
211 pub async fn read(&self) -> tokio::sync::RwLockReadGuard<'_, TaskList> {
212 self.task_list.read().await
213 }
214
215 /// Get a mutable reference to the task list.
216 ///
217 /// Use this to make local changes. After modifying, call `publish_delta`
218 /// to propagate changes to peers.
219 ///
220 /// # Returns
221 ///
222 /// A write guard to the TaskList.
223 pub async fn write(&self) -> tokio::sync::RwLockWriteGuard<'_, TaskList> {
224 self.task_list.write().await
225 }
226
227 /// Get the topic name for this task list.
228 #[must_use]
229 pub fn topic(&self) -> &str {
230 &self.topic
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use crate::crdt::{TaskId, TaskItem, TaskListId, TaskMetadata};
238 use crate::identity::AgentId;
239
240 fn agent(n: u8) -> AgentId {
241 AgentId([n; 32])
242 }
243
244 fn peer(n: u8) -> PeerId {
245 PeerId::new([n; 32])
246 }
247
248 fn list_id(n: u8) -> TaskListId {
249 TaskListId::new([n; 32])
250 }
251
252 fn make_task(id_byte: u8, peer: PeerId) -> TaskItem {
253 let agent = agent(1);
254 let task_id = TaskId::from_bytes([id_byte; 32]);
255 let metadata = TaskMetadata::new(
256 format!("Task {}", id_byte),
257 format!("Description {}", id_byte),
258 128,
259 agent,
260 1000,
261 );
262 TaskItem::new(task_id, metadata, peer)
263 }
264
265 #[tokio::test]
266 async fn test_task_list_sync_creation() {
267 let peer = peer(1);
268 let id = list_id(1);
269 let task_list = TaskList::new(id, "Test List".to_string(), peer);
270
271 // We cannot create a real PubSubManager in a unit test without a NetworkNode
272 // For now, we just verify the types are correct
273 let _list_for_sync = task_list;
274 }
275
276 #[tokio::test]
277 async fn test_apply_delta() {
278 // Create a task list
279 let peer1 = peer(1);
280 let peer2 = peer(2);
281 let id = list_id(1);
282 let task_list = TaskList::new(id, "Test".to_string(), peer1);
283
284 // Wrap in Arc<RwLock<>>
285 let task_list_arc = Arc::new(RwLock::new(task_list));
286
287 // Create a delta with a new task
288 let mut delta = TaskListDelta::new(1);
289 let task = make_task(1, peer2);
290 let task_id = *task.id();
291 let tag = (peer2, 1);
292 delta.added_tasks.insert(task_id, (task, tag));
293
294 // Apply delta directly (simulating what TaskListSync::apply_remote_delta does)
295 {
296 let mut list = task_list_arc.write().await;
297 let result = list.merge_delta(&delta, peer2);
298 assert!(result.is_ok());
299 }
300
301 // Verify task was added
302 {
303 let list = task_list_arc.read().await;
304 assert_eq!(list.task_count(), 1);
305 }
306 }
307
308 #[tokio::test]
309 async fn test_concurrent_access() {
310 // Test that RwLock allows multiple readers
311 let peer = peer(1);
312 let id = list_id(1);
313 let task_list = TaskList::new(id, "Test".to_string(), peer);
314 let task_list_arc = Arc::new(RwLock::new(task_list));
315
316 // Multiple concurrent reads should work
317 let list1 = task_list_arc.read().await;
318 let list2 = task_list_arc.read().await;
319
320 assert_eq!(list1.name(), "Test");
321 assert_eq!(list2.name(), "Test");
322
323 drop(list1);
324 drop(list2);
325
326 // Write should work after readers drop
327 {
328 let mut list = task_list_arc.write().await;
329 list.update_name("Updated".to_string(), peer);
330 }
331
332 // Verify update
333 let list = task_list_arc.read().await;
334 assert_eq!(list.name(), "Updated");
335 }
336}