Skip to main content

pollen_crdt/
sync.rs

1//! CRDT synchronization service.
2//!
3//! Implements anti-entropy synchronization using hierarchical Merkle tree diffing
4//! for efficient detection of divergent state between nodes.
5
6use crate::{CrdtEntry, CrdtKv, CrdtStore};
7use bytes::Bytes;
8use pollen_membership::Membership;
9use pollen_transport::{Envelope, MessageType, Transport};
10use pollen_types::{NodeId, Result};
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::watch;
15use tracing::{debug, info, warn};
16
17/// Configuration for CRDT sync.
18#[derive(Clone, Debug)]
19pub struct CrdtSyncConfig {
20    /// Interval for anti-entropy sync.
21    pub sync_interval: Duration,
22    /// Interval for delta broadcasting.
23    pub broadcast_interval: Duration,
24    /// Maximum entries per sync message.
25    pub max_entries_per_msg: usize,
26    /// Timeout for sync requests.
27    pub sync_timeout: Duration,
28}
29
30impl Default for CrdtSyncConfig {
31    fn default() -> Self {
32        Self {
33            sync_interval: Duration::from_secs(10),
34            broadcast_interval: Duration::from_millis(100),
35            max_entries_per_msg: 100,
36            sync_timeout: Duration::from_secs(5),
37        }
38    }
39}
40
41/// Merkle tree sync request.
42#[derive(Clone, Debug, Serialize, Deserialize)]
43pub struct MerkleRequest {
44    /// Level in the Merkle tree (0 = root, 1 = first level, etc.)
45    pub level: usize,
46    /// Our hashes at this level.
47    pub hashes: Vec<(String, Bytes)>,
48}
49
50/// Merkle tree sync response.
51#[derive(Clone, Debug, Serialize, Deserialize)]
52pub struct MerkleResponse {
53    /// Level in the Merkle tree.
54    pub level: usize,
55    /// Their hashes at this level.
56    pub hashes: Vec<(String, Bytes)>,
57    /// Differing buckets to drill down into.
58    pub differing_buckets: Vec<String>,
59}
60
61/// Data range request.
62#[derive(Clone, Debug, Serialize, Deserialize)]
63pub struct DataRangeRequest {
64    /// Start key (inclusive).
65    pub start: String,
66    /// End key (exclusive).
67    pub end: String,
68}
69
70/// CRDT synchronization service.
71pub struct CrdtSyncService {
72    node_id: NodeId,
73    crdt_store: Arc<CrdtStore>,
74    transport: Arc<dyn Transport>,
75    membership: Arc<dyn Membership>,
76    config: CrdtSyncConfig,
77    shutdown: watch::Sender<bool>,
78}
79
80impl CrdtSyncService {
81    /// Create a new sync service.
82    pub fn new(
83        node_id: NodeId,
84        crdt_store: Arc<CrdtStore>,
85        transport: Arc<dyn Transport>,
86        membership: Arc<dyn Membership>,
87        config: CrdtSyncConfig,
88    ) -> Self {
89        let (shutdown, _) = watch::channel(false);
90
91        Self {
92            node_id,
93            crdt_store,
94            transport,
95            membership,
96            config,
97            shutdown,
98        }
99    }
100
101    /// Start the sync service.
102    pub fn start(self: Arc<Self>) {
103        let service = Arc::clone(&self);
104        tokio::spawn(async move {
105            service.run_anti_entropy().await;
106        });
107
108        let service = Arc::clone(&self);
109        tokio::spawn(async move {
110            service.run_delta_broadcast().await;
111        });
112
113        info!("CRDT sync service started");
114    }
115
116    /// Run anti-entropy loop.
117    async fn run_anti_entropy(&self) {
118        let mut interval = tokio::time::interval(self.config.sync_interval);
119        let mut shutdown_rx = self.shutdown.subscribe();
120
121        loop {
122            tokio::select! {
123                _ = shutdown_rx.changed() => {
124                    if *shutdown_rx.borrow() {
125                        break;
126                    }
127                }
128                _ = interval.tick() => {
129                    if let Err(e) = self.anti_entropy_round().await {
130                        warn!("Anti-entropy round failed: {}", e);
131                    }
132                }
133            }
134        }
135    }
136
137    /// Perform one round of anti-entropy sync with a random peer.
138    async fn anti_entropy_round(&self) -> Result<()> {
139        let peers = self.membership.alive_members();
140        if peers.len() <= 1 {
141            // No other peers to sync with
142            return Ok(());
143        }
144
145        // Pick a random peer (excluding self)
146        let other_peers: Vec<_> = peers.iter().filter(|p| p.id != self.node_id).collect();
147        if other_peers.is_empty() {
148            return Ok(());
149        }
150
151        let peer_idx = rand::random::<usize>() % other_peers.len();
152        let peer = other_peers[peer_idx];
153
154        debug!("Starting anti-entropy sync with {:?}", peer.id);
155
156        // Start with root level comparison
157        self.sync_with_peer(peer.id, peer.addr).await
158    }
159
160    /// Perform hierarchical Merkle tree sync with a specific peer.
161    async fn sync_with_peer(&self, peer_id: NodeId, peer_addr: std::net::SocketAddr) -> Result<()> {
162        // Get our Merkle root
163        let my_root = self.crdt_store.merkle_root();
164
165        // Request peer's root
166        let request = MerkleRequest {
167            level: 0,
168            hashes: vec![("root".to_string(), my_root.clone())],
169        };
170
171        let payload = bincode::serialize(&request)?;
172        let envelope = Envelope::new(
173            self.node_id,
174            peer_id,
175            MessageType::MerkleTreeRequest,
176            Bytes::from(payload),
177            pollen_clock::Timestamp::zero(),
178        );
179
180        // Send and wait for response
181        let response = match tokio::time::timeout(
182            self.config.sync_timeout,
183            self.transport.send_recv(peer_addr, envelope),
184        )
185        .await
186        {
187            Ok(Ok(resp)) => resp,
188            Ok(Err(e)) => {
189                debug!("Failed to get Merkle response from {:?}: {}", peer_id, e);
190                return Ok(());
191            }
192            Err(_) => {
193                debug!("Merkle request to {:?} timed out", peer_id);
194                return Ok(());
195            }
196        };
197
198        // Parse response
199        if response.msg_type != MessageType::MerkleTreeResponse {
200            return Ok(());
201        }
202
203        let merkle_response: MerkleResponse = bincode::deserialize(&response.payload)?;
204
205        // If roots match, we're in sync
206        if merkle_response.differing_buckets.is_empty() {
207            debug!("In sync with {:?}", peer_id);
208            return Ok(());
209        }
210
211        // Drill down into differing buckets
212        self.sync_differing_ranges(peer_id, peer_addr, &merkle_response.differing_buckets)
213            .await
214    }
215
216    /// Sync specific key ranges that differ.
217    async fn sync_differing_ranges(
218        &self,
219        peer_id: NodeId,
220        peer_addr: std::net::SocketAddr,
221        ranges: &[String],
222    ) -> Result<()> {
223        for range in ranges {
224            // Request data in this range
225            let (start, end) = self.range_from_bucket(range);
226
227            let request = DataRangeRequest {
228                start: start.clone(),
229                end: end.clone(),
230            };
231
232            let payload = bincode::serialize(&request)?;
233            let envelope = Envelope::new(
234                self.node_id,
235                peer_id,
236                MessageType::DataRangeRequest,
237                Bytes::from(payload),
238                pollen_clock::Timestamp::zero(),
239            );
240
241            let response = match tokio::time::timeout(
242                self.config.sync_timeout,
243                self.transport.send_recv(peer_addr, envelope),
244            )
245            .await
246            {
247                Ok(Ok(resp)) => resp,
248                Ok(Err(e)) => {
249                    warn!("Failed to get data range from {:?}: {}", peer_id, e);
250                    continue;
251                }
252                Err(_) => {
253                    warn!("Data range request to {:?} timed out", peer_id);
254                    continue;
255                }
256            };
257
258            if response.msg_type != MessageType::DataRangeResponse {
259                continue;
260            }
261
262            // Apply received entries
263            let entries: Vec<CrdtEntry> = bincode::deserialize(&response.payload)?;
264            for entry in entries {
265                if let Err(e) = self.crdt_store.apply_delta(entry).await {
266                    warn!("Failed to apply synced entry: {}", e);
267                }
268            }
269
270            // Send our entries in this range to peer
271            let our_entries = self.crdt_store.entries_in_range(&start, &end);
272            if !our_entries.is_empty() {
273                for chunk in our_entries.chunks(self.config.max_entries_per_msg) {
274                    let payload = bincode::serialize(&chunk.to_vec())?;
275                    let envelope = Envelope::new(
276                        self.node_id,
277                        peer_id,
278                        MessageType::CrdtFullSync,
279                        Bytes::from(payload),
280                        pollen_clock::Timestamp::zero(),
281                    );
282                    let _ = self.transport.send(peer_addr, envelope).await;
283                }
284            }
285        }
286
287        Ok(())
288    }
289
290    /// Convert a bucket identifier to a key range.
291    fn range_from_bucket(&self, bucket: &str) -> (String, String) {
292        // Bucket format: "level:index" or just use the bucket as a prefix
293        // For simplicity, treat bucket as a key prefix
294        let start = bucket.to_string();
295        let end = format!("{}~", bucket); // '~' is after most printable chars in ASCII
296        (start, end)
297    }
298
299    /// Run delta broadcast loop.
300    async fn run_delta_broadcast(&self) {
301        let mut rx = self.crdt_store.subscribe("");
302        let mut shutdown_rx = self.shutdown.subscribe();
303
304        loop {
305            tokio::select! {
306                _ = shutdown_rx.changed() => {
307                    if *shutdown_rx.borrow() {
308                        break;
309                    }
310                }
311                event = rx.recv() => {
312                    match event {
313                        Ok(crate::CrdtEvent::Updated { key }) => {
314                            if let Err(e) = self.broadcast_key(&key).await {
315                                warn!("Failed to broadcast delta for {}: {}", key, e);
316                            }
317                        }
318                        Ok(crate::CrdtEvent::Deleted { key }) => {
319                            if let Err(e) = self.broadcast_key(&key).await {
320                                warn!("Failed to broadcast deletion for {}: {}", key, e);
321                            }
322                        }
323                        Err(_) => {
324                            // Channel lagged, that's okay
325                        }
326                    }
327                }
328            }
329        }
330    }
331
332    /// Broadcast a key update to all peers.
333    async fn broadcast_key(&self, key: &str) -> Result<()> {
334        let entries = self.crdt_store.entries_in_range(key, &format!("{}~", key));
335        if entries.is_empty() {
336            return Ok(());
337        }
338
339        let entry = &entries[0];
340        let peers = self.membership.alive_members();
341
342        for peer in peers {
343            if peer.id == self.node_id {
344                continue;
345            }
346
347            let payload = bincode::serialize(&vec![entry.clone()])?;
348            let envelope = Envelope::new(
349                self.node_id,
350                peer.id,
351                MessageType::CrdtDelta,
352                Bytes::from(payload),
353                pollen_clock::Timestamp::zero(),
354            );
355
356            // Fire and forget
357            let transport = Arc::clone(&self.transport);
358            let addr = peer.addr;
359            tokio::spawn(async move {
360                let _ = transport.send(addr, envelope).await;
361            });
362        }
363
364        Ok(())
365    }
366
367    /// Handle incoming CRDT messages.
368    pub async fn handle_message(&self, envelope: Envelope) -> Result<Option<Envelope>> {
369        match envelope.msg_type {
370            MessageType::CrdtDelta | MessageType::CrdtFullSync => {
371                let entries: Vec<CrdtEntry> = bincode::deserialize(&envelope.payload)?;
372                for entry in entries {
373                    if let Err(e) = self.crdt_store.apply_delta(entry).await {
374                        warn!("Failed to apply delta: {}", e);
375                    }
376                }
377                Ok(None)
378            }
379
380            MessageType::MerkleTreeRequest => {
381                let request: MerkleRequest = bincode::deserialize(&envelope.payload)?;
382                let response = self.handle_merkle_request(request);
383                let payload = bincode::serialize(&response)?;
384
385                Ok(Some(Envelope::new(
386                    self.node_id,
387                    envelope.from,
388                    MessageType::MerkleTreeResponse,
389                    Bytes::from(payload),
390                    pollen_clock::Timestamp::zero(),
391                )))
392            }
393
394            MessageType::DataRangeRequest => {
395                let request: DataRangeRequest = bincode::deserialize(&envelope.payload)?;
396                let entries = self.crdt_store.entries_in_range(&request.start, &request.end);
397                let payload = bincode::serialize(&entries)?;
398
399                Ok(Some(Envelope::new(
400                    self.node_id,
401                    envelope.from,
402                    MessageType::DataRangeResponse,
403                    Bytes::from(payload),
404                    pollen_clock::Timestamp::zero(),
405                )))
406            }
407
408            _ => Ok(None),
409        }
410    }
411
412    /// Handle Merkle tree request and compute differing buckets.
413    fn handle_merkle_request(&self, request: MerkleRequest) -> MerkleResponse {
414        let my_hashes = self.crdt_store.merkle_level(request.level);
415
416        // Find differing buckets
417        let mut differing = Vec::new();
418
419        for (bucket, their_hash) in &request.hashes {
420            let my_hash = my_hashes
421                .iter()
422                .find(|(b, _)| b == bucket)
423                .map(|(_, h)| h.clone());
424
425            match my_hash {
426                Some(h) if h != *their_hash => {
427                    differing.push(bucket.clone());
428                }
429                None => {
430                    // We don't have this bucket, might need their data
431                    differing.push(bucket.clone());
432                }
433                _ => {}
434            }
435        }
436
437        // Also check for buckets we have that they don't
438        for (bucket, _) in &my_hashes {
439            if !request.hashes.iter().any(|(b, _)| b == bucket) {
440                differing.push(bucket.clone());
441            }
442        }
443
444        MerkleResponse {
445            level: request.level,
446            hashes: my_hashes,
447            differing_buckets: differing,
448        }
449    }
450
451    /// Shutdown the sync service.
452    pub fn shutdown(&self) {
453        let _ = self.shutdown.send(true);
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460
461    #[test]
462    fn test_config_default() {
463        let config = CrdtSyncConfig::default();
464        assert_eq!(config.sync_interval, Duration::from_secs(10));
465        assert_eq!(config.sync_timeout, Duration::from_secs(5));
466    }
467
468    #[test]
469    fn test_merkle_request_serialization() {
470        let request = MerkleRequest {
471            level: 1,
472            hashes: vec![
473                ("bucket1".to_string(), Bytes::from("hash1")),
474                ("bucket2".to_string(), Bytes::from("hash2")),
475            ],
476        };
477
478        let serialized = bincode::serialize(&request).unwrap();
479        let deserialized: MerkleRequest = bincode::deserialize(&serialized).unwrap();
480
481        assert_eq!(deserialized.level, 1);
482        assert_eq!(deserialized.hashes.len(), 2);
483    }
484
485    #[test]
486    fn test_data_range_request_serialization() {
487        let request = DataRangeRequest {
488            start: "task:".to_string(),
489            end: "task:~".to_string(),
490        };
491
492        let serialized = bincode::serialize(&request).unwrap();
493        let deserialized: DataRangeRequest = bincode::deserialize(&serialized).unwrap();
494
495        assert_eq!(deserialized.start, "task:");
496        assert_eq!(deserialized.end, "task:~");
497    }
498}