1use crate::{CrdtEntry, CrdtKv, CrdtStore};
7use bytes::Bytes;
8use pollen_membership::Membership;
9use pollen_transport::{Envelope, MessageType, Transport};
10use pollen_types::{NodeId, Result};
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::watch;
15use tracing::{debug, info, warn};
16
17#[derive(Clone, Debug)]
19pub struct CrdtSyncConfig {
20 pub sync_interval: Duration,
22 pub broadcast_interval: Duration,
24 pub max_entries_per_msg: usize,
26 pub sync_timeout: Duration,
28}
29
30impl Default for CrdtSyncConfig {
31 fn default() -> Self {
32 Self {
33 sync_interval: Duration::from_secs(10),
34 broadcast_interval: Duration::from_millis(100),
35 max_entries_per_msg: 100,
36 sync_timeout: Duration::from_secs(5),
37 }
38 }
39}
40
41#[derive(Clone, Debug, Serialize, Deserialize)]
43pub struct MerkleRequest {
44 pub level: usize,
46 pub hashes: Vec<(String, Bytes)>,
48}
49
50#[derive(Clone, Debug, Serialize, Deserialize)]
52pub struct MerkleResponse {
53 pub level: usize,
55 pub hashes: Vec<(String, Bytes)>,
57 pub differing_buckets: Vec<String>,
59}
60
61#[derive(Clone, Debug, Serialize, Deserialize)]
63pub struct DataRangeRequest {
64 pub start: String,
66 pub end: String,
68}
69
70pub struct CrdtSyncService {
72 node_id: NodeId,
73 crdt_store: Arc<CrdtStore>,
74 transport: Arc<dyn Transport>,
75 membership: Arc<dyn Membership>,
76 config: CrdtSyncConfig,
77 shutdown: watch::Sender<bool>,
78}
79
80impl CrdtSyncService {
81 pub fn new(
83 node_id: NodeId,
84 crdt_store: Arc<CrdtStore>,
85 transport: Arc<dyn Transport>,
86 membership: Arc<dyn Membership>,
87 config: CrdtSyncConfig,
88 ) -> Self {
89 let (shutdown, _) = watch::channel(false);
90
91 Self {
92 node_id,
93 crdt_store,
94 transport,
95 membership,
96 config,
97 shutdown,
98 }
99 }
100
101 pub fn start(self: Arc<Self>) {
103 let service = Arc::clone(&self);
104 tokio::spawn(async move {
105 service.run_anti_entropy().await;
106 });
107
108 let service = Arc::clone(&self);
109 tokio::spawn(async move {
110 service.run_delta_broadcast().await;
111 });
112
113 info!("CRDT sync service started");
114 }
115
116 async fn run_anti_entropy(&self) {
118 let mut interval = tokio::time::interval(self.config.sync_interval);
119 let mut shutdown_rx = self.shutdown.subscribe();
120
121 loop {
122 tokio::select! {
123 _ = shutdown_rx.changed() => {
124 if *shutdown_rx.borrow() {
125 break;
126 }
127 }
128 _ = interval.tick() => {
129 if let Err(e) = self.anti_entropy_round().await {
130 warn!("Anti-entropy round failed: {}", e);
131 }
132 }
133 }
134 }
135 }
136
137 async fn anti_entropy_round(&self) -> Result<()> {
139 let peers = self.membership.alive_members();
140 if peers.len() <= 1 {
141 return Ok(());
143 }
144
145 let other_peers: Vec<_> = peers.iter().filter(|p| p.id != self.node_id).collect();
147 if other_peers.is_empty() {
148 return Ok(());
149 }
150
151 let peer_idx = rand::random::<usize>() % other_peers.len();
152 let peer = other_peers[peer_idx];
153
154 debug!("Starting anti-entropy sync with {:?}", peer.id);
155
156 self.sync_with_peer(peer.id, peer.addr).await
158 }
159
160 async fn sync_with_peer(&self, peer_id: NodeId, peer_addr: std::net::SocketAddr) -> Result<()> {
162 let my_root = self.crdt_store.merkle_root();
164
165 let request = MerkleRequest {
167 level: 0,
168 hashes: vec![("root".to_string(), my_root.clone())],
169 };
170
171 let payload = bincode::serialize(&request)?;
172 let envelope = Envelope::new(
173 self.node_id,
174 peer_id,
175 MessageType::MerkleTreeRequest,
176 Bytes::from(payload),
177 pollen_clock::Timestamp::zero(),
178 );
179
180 let response = match tokio::time::timeout(
182 self.config.sync_timeout,
183 self.transport.send_recv(peer_addr, envelope),
184 )
185 .await
186 {
187 Ok(Ok(resp)) => resp,
188 Ok(Err(e)) => {
189 debug!("Failed to get Merkle response from {:?}: {}", peer_id, e);
190 return Ok(());
191 }
192 Err(_) => {
193 debug!("Merkle request to {:?} timed out", peer_id);
194 return Ok(());
195 }
196 };
197
198 if response.msg_type != MessageType::MerkleTreeResponse {
200 return Ok(());
201 }
202
203 let merkle_response: MerkleResponse = bincode::deserialize(&response.payload)?;
204
205 if merkle_response.differing_buckets.is_empty() {
207 debug!("In sync with {:?}", peer_id);
208 return Ok(());
209 }
210
211 self.sync_differing_ranges(peer_id, peer_addr, &merkle_response.differing_buckets)
213 .await
214 }
215
216 async fn sync_differing_ranges(
218 &self,
219 peer_id: NodeId,
220 peer_addr: std::net::SocketAddr,
221 ranges: &[String],
222 ) -> Result<()> {
223 for range in ranges {
224 let (start, end) = self.range_from_bucket(range);
226
227 let request = DataRangeRequest {
228 start: start.clone(),
229 end: end.clone(),
230 };
231
232 let payload = bincode::serialize(&request)?;
233 let envelope = Envelope::new(
234 self.node_id,
235 peer_id,
236 MessageType::DataRangeRequest,
237 Bytes::from(payload),
238 pollen_clock::Timestamp::zero(),
239 );
240
241 let response = match tokio::time::timeout(
242 self.config.sync_timeout,
243 self.transport.send_recv(peer_addr, envelope),
244 )
245 .await
246 {
247 Ok(Ok(resp)) => resp,
248 Ok(Err(e)) => {
249 warn!("Failed to get data range from {:?}: {}", peer_id, e);
250 continue;
251 }
252 Err(_) => {
253 warn!("Data range request to {:?} timed out", peer_id);
254 continue;
255 }
256 };
257
258 if response.msg_type != MessageType::DataRangeResponse {
259 continue;
260 }
261
262 let entries: Vec<CrdtEntry> = bincode::deserialize(&response.payload)?;
264 for entry in entries {
265 if let Err(e) = self.crdt_store.apply_delta(entry).await {
266 warn!("Failed to apply synced entry: {}", e);
267 }
268 }
269
270 let our_entries = self.crdt_store.entries_in_range(&start, &end);
272 if !our_entries.is_empty() {
273 for chunk in our_entries.chunks(self.config.max_entries_per_msg) {
274 let payload = bincode::serialize(&chunk.to_vec())?;
275 let envelope = Envelope::new(
276 self.node_id,
277 peer_id,
278 MessageType::CrdtFullSync,
279 Bytes::from(payload),
280 pollen_clock::Timestamp::zero(),
281 );
282 let _ = self.transport.send(peer_addr, envelope).await;
283 }
284 }
285 }
286
287 Ok(())
288 }
289
290 fn range_from_bucket(&self, bucket: &str) -> (String, String) {
292 let start = bucket.to_string();
295 let end = format!("{}~", bucket); (start, end)
297 }
298
299 async fn run_delta_broadcast(&self) {
301 let mut rx = self.crdt_store.subscribe("");
302 let mut shutdown_rx = self.shutdown.subscribe();
303
304 loop {
305 tokio::select! {
306 _ = shutdown_rx.changed() => {
307 if *shutdown_rx.borrow() {
308 break;
309 }
310 }
311 event = rx.recv() => {
312 match event {
313 Ok(crate::CrdtEvent::Updated { key }) => {
314 if let Err(e) = self.broadcast_key(&key).await {
315 warn!("Failed to broadcast delta for {}: {}", key, e);
316 }
317 }
318 Ok(crate::CrdtEvent::Deleted { key }) => {
319 if let Err(e) = self.broadcast_key(&key).await {
320 warn!("Failed to broadcast deletion for {}: {}", key, e);
321 }
322 }
323 Err(_) => {
324 }
326 }
327 }
328 }
329 }
330 }
331
332 async fn broadcast_key(&self, key: &str) -> Result<()> {
334 let entries = self.crdt_store.entries_in_range(key, &format!("{}~", key));
335 if entries.is_empty() {
336 return Ok(());
337 }
338
339 let entry = &entries[0];
340 let peers = self.membership.alive_members();
341
342 for peer in peers {
343 if peer.id == self.node_id {
344 continue;
345 }
346
347 let payload = bincode::serialize(&vec![entry.clone()])?;
348 let envelope = Envelope::new(
349 self.node_id,
350 peer.id,
351 MessageType::CrdtDelta,
352 Bytes::from(payload),
353 pollen_clock::Timestamp::zero(),
354 );
355
356 let transport = Arc::clone(&self.transport);
358 let addr = peer.addr;
359 tokio::spawn(async move {
360 let _ = transport.send(addr, envelope).await;
361 });
362 }
363
364 Ok(())
365 }
366
367 pub async fn handle_message(&self, envelope: Envelope) -> Result<Option<Envelope>> {
369 match envelope.msg_type {
370 MessageType::CrdtDelta | MessageType::CrdtFullSync => {
371 let entries: Vec<CrdtEntry> = bincode::deserialize(&envelope.payload)?;
372 for entry in entries {
373 if let Err(e) = self.crdt_store.apply_delta(entry).await {
374 warn!("Failed to apply delta: {}", e);
375 }
376 }
377 Ok(None)
378 }
379
380 MessageType::MerkleTreeRequest => {
381 let request: MerkleRequest = bincode::deserialize(&envelope.payload)?;
382 let response = self.handle_merkle_request(request);
383 let payload = bincode::serialize(&response)?;
384
385 Ok(Some(Envelope::new(
386 self.node_id,
387 envelope.from,
388 MessageType::MerkleTreeResponse,
389 Bytes::from(payload),
390 pollen_clock::Timestamp::zero(),
391 )))
392 }
393
394 MessageType::DataRangeRequest => {
395 let request: DataRangeRequest = bincode::deserialize(&envelope.payload)?;
396 let entries = self.crdt_store.entries_in_range(&request.start, &request.end);
397 let payload = bincode::serialize(&entries)?;
398
399 Ok(Some(Envelope::new(
400 self.node_id,
401 envelope.from,
402 MessageType::DataRangeResponse,
403 Bytes::from(payload),
404 pollen_clock::Timestamp::zero(),
405 )))
406 }
407
408 _ => Ok(None),
409 }
410 }
411
412 fn handle_merkle_request(&self, request: MerkleRequest) -> MerkleResponse {
414 let my_hashes = self.crdt_store.merkle_level(request.level);
415
416 let mut differing = Vec::new();
418
419 for (bucket, their_hash) in &request.hashes {
420 let my_hash = my_hashes
421 .iter()
422 .find(|(b, _)| b == bucket)
423 .map(|(_, h)| h.clone());
424
425 match my_hash {
426 Some(h) if h != *their_hash => {
427 differing.push(bucket.clone());
428 }
429 None => {
430 differing.push(bucket.clone());
432 }
433 _ => {}
434 }
435 }
436
437 for (bucket, _) in &my_hashes {
439 if !request.hashes.iter().any(|(b, _)| b == bucket) {
440 differing.push(bucket.clone());
441 }
442 }
443
444 MerkleResponse {
445 level: request.level,
446 hashes: my_hashes,
447 differing_buckets: differing,
448 }
449 }
450
451 pub fn shutdown(&self) {
453 let _ = self.shutdown.send(true);
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460
461 #[test]
462 fn test_config_default() {
463 let config = CrdtSyncConfig::default();
464 assert_eq!(config.sync_interval, Duration::from_secs(10));
465 assert_eq!(config.sync_timeout, Duration::from_secs(5));
466 }
467
468 #[test]
469 fn test_merkle_request_serialization() {
470 let request = MerkleRequest {
471 level: 1,
472 hashes: vec![
473 ("bucket1".to_string(), Bytes::from("hash1")),
474 ("bucket2".to_string(), Bytes::from("hash2")),
475 ],
476 };
477
478 let serialized = bincode::serialize(&request).unwrap();
479 let deserialized: MerkleRequest = bincode::deserialize(&serialized).unwrap();
480
481 assert_eq!(deserialized.level, 1);
482 assert_eq!(deserialized.hashes.len(), 2);
483 }
484
485 #[test]
486 fn test_data_range_request_serialization() {
487 let request = DataRangeRequest {
488 start: "task:".to_string(),
489 end: "task:~".to_string(),
490 };
491
492 let serialized = bincode::serialize(&request).unwrap();
493 let deserialized: DataRangeRequest = bincode::deserialize(&serialized).unwrap();
494
495 assert_eq!(deserialized.start, "task:");
496 assert_eq!(deserialized.end, "task:~");
497 }
498}