Skip to main content

cliffy_protocols/
sync.rs

1//! Synchronization protocol for P2P state coordination
2//!
3//! This module defines the protocol messages and types for peer-to-peer
4//! state synchronization using geometric deltas and vector clocks.
5//!
6//! # Protocol Overview
7//!
8//! The sync protocol operates in three phases:
9//! 1. **Discovery**: Peers announce themselves and exchange clock information
10//! 2. **Sync**: Peers exchange deltas to converge on consistent state
11//! 3. **Maintenance**: Periodic heartbeats and partition recovery
12//!
13//! # Example
14//!
15//! ```rust
16//! use cliffy_protocols::sync::{SyncMessage, SyncState, PeerInfo};
17//! use cliffy_protocols::VectorClock;
18//! use uuid::Uuid;
19//!
20//! // Create sync state for a node
21//! let node_id = Uuid::new_v4();
22//! let mut sync_state = SyncState::new(node_id);
23//!
24//! // Register a peer
25//! let peer_id = Uuid::new_v4();
26//! sync_state.register_peer(peer_id, VectorClock::new());
27//! ```
28
29use crate::delta::DeltaBatch;
30use crate::serde_ga3;
31use crate::VectorClock;
32use cliffy_core::GA3;
33use serde::{Deserialize, Serialize};
34use std::collections::HashMap;
35use std::time::{Duration, Instant};
36use uuid::Uuid;
37
38/// A message in the sync protocol.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct SyncMessage {
41    /// Unique message ID
42    pub id: u64,
43    /// Sender's node ID
44    pub sender: Uuid,
45    /// Message type and payload
46    pub payload: SyncPayload,
47    /// Sender's current vector clock
48    pub clock: VectorClock,
49    /// Timestamp (milliseconds since epoch, for debugging)
50    pub timestamp: u64,
51}
52
53/// The payload of a sync message.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub enum SyncPayload {
56    /// Announce presence to peers
57    Hello(PeerInfo),
58
59    /// Request clock comparison
60    ClockRequest,
61
62    /// Response with current clock
63    ClockResponse(VectorClock),
64
65    /// Request deltas since a given clock
66    DeltaRequest {
67        /// The clock the requester has
68        since_clock: VectorClock,
69    },
70
71    /// Response with deltas
72    DeltaResponse {
73        /// Deltas to apply
74        deltas: DeltaBatch,
75        /// Whether more deltas are available
76        has_more: bool,
77    },
78
79    /// Full state sync (for new peers or recovery)
80    FullState {
81        /// The complete state
82        #[serde(with = "serde_ga3")]
83        state: GA3,
84        /// Clock at this state
85        clock: VectorClock,
86    },
87
88    /// Heartbeat to maintain connection
89    Heartbeat,
90
91    /// Acknowledge receipt of deltas
92    Ack {
93        /// Message ID being acknowledged
94        message_id: u64,
95        /// Applied clock
96        applied_clock: VectorClock,
97    },
98
99    /// Goodbye message when leaving
100    Goodbye,
101}
102
103/// Information about a peer node.
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct PeerInfo {
106    /// The peer's node ID
107    pub node_id: Uuid,
108    /// Human-readable name (optional)
109    pub name: Option<String>,
110    /// Capabilities this peer supports
111    pub capabilities: PeerCapabilities,
112    /// Protocol version
113    pub protocol_version: u32,
114}
115
116/// Capabilities a peer may support.
117#[derive(Debug, Clone, Default, Serialize, Deserialize)]
118pub struct PeerCapabilities {
119    /// Supports compressed deltas
120    pub compressed_deltas: bool,
121    /// Supports batch operations
122    pub batch_operations: bool,
123    /// Maximum batch size supported
124    pub max_batch_size: usize,
125    /// Supports full state snapshots
126    pub full_state_sync: bool,
127}
128
129impl PeerCapabilities {
130    /// Create default capabilities.
131    pub fn default_capabilities() -> Self {
132        Self {
133            compressed_deltas: true,
134            batch_operations: true,
135            max_batch_size: 100,
136            full_state_sync: true,
137        }
138    }
139}
140
141/// Connection state with a peer.
142#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143pub enum PeerConnectionState {
144    /// Just discovered, not yet synced
145    Discovered,
146    /// Currently synchronizing
147    Syncing,
148    /// Fully synchronized
149    Synced,
150    /// Connection lost, attempting recovery
151    Disconnected,
152    /// Peer has left the network
153    Gone,
154}
155
156/// Tracked state for a connected peer.
157#[derive(Debug, Clone)]
158pub struct PeerState {
159    /// Peer information
160    pub info: PeerInfo,
161    /// Last known clock from this peer
162    pub last_clock: VectorClock,
163    /// Connection state
164    pub connection_state: PeerConnectionState,
165    /// Last message received timestamp
166    pub last_seen: Option<Instant>,
167    /// Pending acknowledgments (message_id -> sent_time)
168    pub pending_acks: HashMap<u64, Instant>,
169    /// Round-trip time estimate (in milliseconds)
170    pub rtt_estimate: Option<Duration>,
171}
172
173impl PeerState {
174    /// Create a new peer state.
175    pub fn new(info: PeerInfo, clock: VectorClock) -> Self {
176        Self {
177            info,
178            last_clock: clock,
179            connection_state: PeerConnectionState::Discovered,
180            last_seen: None,
181            pending_acks: HashMap::new(),
182            rtt_estimate: None,
183        }
184    }
185
186    /// Update the last seen time.
187    pub fn touch(&mut self) {
188        self.last_seen = Some(Instant::now());
189    }
190
191    /// Check if the peer is considered stale (no recent activity).
192    pub fn is_stale(&self, timeout: Duration) -> bool {
193        match self.last_seen {
194            Some(last) => last.elapsed() > timeout,
195            None => true,
196        }
197    }
198
199    /// Record that we sent a message requiring acknowledgment.
200    pub fn expect_ack(&mut self, message_id: u64) {
201        self.pending_acks.insert(message_id, Instant::now());
202    }
203
204    /// Record that we received an acknowledgment.
205    pub fn receive_ack(&mut self, message_id: u64) {
206        if let Some(sent_time) = self.pending_acks.remove(&message_id) {
207            let rtt = sent_time.elapsed();
208            self.rtt_estimate = Some(match self.rtt_estimate {
209                Some(prev) => Duration::from_millis(
210                    (prev.as_millis() as f64 * 0.8 + rtt.as_millis() as f64 * 0.2) as u64,
211                ),
212                None => rtt,
213            });
214        }
215    }
216}
217
218/// The synchronization state for a node.
219#[derive(Debug)]
220pub struct SyncState {
221    /// This node's ID
222    pub node_id: Uuid,
223    /// Current vector clock
224    pub clock: VectorClock,
225    /// Known peers
226    pub peers: HashMap<Uuid, PeerState>,
227    /// Next message ID
228    next_message_id: u64,
229    /// Configuration
230    pub config: SyncConfig,
231}
232
233/// Configuration for sync behavior.
234#[derive(Debug, Clone)]
235pub struct SyncConfig {
236    /// How often to send heartbeats
237    pub heartbeat_interval: Duration,
238    /// How long before a peer is considered stale
239    pub peer_timeout: Duration,
240    /// Maximum deltas per batch
241    pub max_batch_size: usize,
242    /// Whether to prefer compressed deltas
243    pub prefer_compressed: bool,
244    /// Protocol version
245    pub protocol_version: u32,
246}
247
248impl Default for SyncConfig {
249    fn default() -> Self {
250        Self {
251            heartbeat_interval: Duration::from_secs(5),
252            peer_timeout: Duration::from_secs(30),
253            max_batch_size: 100,
254            prefer_compressed: true,
255            protocol_version: 1,
256        }
257    }
258}
259
260impl SyncState {
261    /// Create a new sync state for a node.
262    pub fn new(node_id: Uuid) -> Self {
263        Self {
264            node_id,
265            clock: VectorClock::new(),
266            peers: HashMap::new(),
267            next_message_id: 0,
268            config: SyncConfig::default(),
269        }
270    }
271
272    /// Create with custom configuration.
273    pub fn with_config(node_id: Uuid, config: SyncConfig) -> Self {
274        Self {
275            node_id,
276            clock: VectorClock::new(),
277            peers: HashMap::new(),
278            next_message_id: 0,
279            config,
280        }
281    }
282
283    /// Register a new peer.
284    pub fn register_peer(&mut self, peer_id: Uuid, clock: VectorClock) {
285        let info = PeerInfo {
286            node_id: peer_id,
287            name: None,
288            capabilities: PeerCapabilities::default_capabilities(),
289            protocol_version: self.config.protocol_version,
290        };
291        self.peers.insert(peer_id, PeerState::new(info, clock));
292    }
293
294    /// Register a peer with full info.
295    pub fn register_peer_with_info(&mut self, info: PeerInfo, clock: VectorClock) {
296        let peer_id = info.node_id;
297        self.peers.insert(peer_id, PeerState::new(info, clock));
298    }
299
300    /// Remove a peer.
301    pub fn remove_peer(&mut self, peer_id: &Uuid) {
302        self.peers.remove(peer_id);
303    }
304
305    /// Get a peer's state.
306    pub fn get_peer(&self, peer_id: &Uuid) -> Option<&PeerState> {
307        self.peers.get(peer_id)
308    }
309
310    /// Get a mutable reference to a peer's state.
311    pub fn get_peer_mut(&mut self, peer_id: &Uuid) -> Option<&mut PeerState> {
312        self.peers.get_mut(peer_id)
313    }
314
315    /// Update our clock and get next message ID.
316    pub fn tick(&mut self) -> u64 {
317        self.clock.tick(self.node_id);
318        let id = self.next_message_id;
319        self.next_message_id += 1;
320        id
321    }
322
323    /// Create a hello message.
324    pub fn create_hello(&mut self, name: Option<String>) -> SyncMessage {
325        let id = self.tick();
326        SyncMessage {
327            id,
328            sender: self.node_id,
329            payload: SyncPayload::Hello(PeerInfo {
330                node_id: self.node_id,
331                name,
332                capabilities: PeerCapabilities::default_capabilities(),
333                protocol_version: self.config.protocol_version,
334            }),
335            clock: self.clock.clone(),
336            timestamp: current_timestamp_ms(),
337        }
338    }
339
340    /// Create a delta request message.
341    pub fn create_delta_request(&mut self, since_clock: VectorClock) -> SyncMessage {
342        let id = self.tick();
343        SyncMessage {
344            id,
345            sender: self.node_id,
346            payload: SyncPayload::DeltaRequest { since_clock },
347            clock: self.clock.clone(),
348            timestamp: current_timestamp_ms(),
349        }
350    }
351
352    /// Create a delta response message.
353    pub fn create_delta_response(&mut self, deltas: DeltaBatch, has_more: bool) -> SyncMessage {
354        let id = self.tick();
355        SyncMessage {
356            id,
357            sender: self.node_id,
358            payload: SyncPayload::DeltaResponse { deltas, has_more },
359            clock: self.clock.clone(),
360            timestamp: current_timestamp_ms(),
361        }
362    }
363
364    /// Create a full state message.
365    pub fn create_full_state(&mut self, state: GA3) -> SyncMessage {
366        let id = self.tick();
367        SyncMessage {
368            id,
369            sender: self.node_id,
370            payload: SyncPayload::FullState {
371                state,
372                clock: self.clock.clone(),
373            },
374            clock: self.clock.clone(),
375            timestamp: current_timestamp_ms(),
376        }
377    }
378
379    /// Create a heartbeat message.
380    pub fn create_heartbeat(&mut self) -> SyncMessage {
381        let id = self.tick();
382        SyncMessage {
383            id,
384            sender: self.node_id,
385            payload: SyncPayload::Heartbeat,
386            clock: self.clock.clone(),
387            timestamp: current_timestamp_ms(),
388        }
389    }
390
391    /// Create an acknowledgment message.
392    pub fn create_ack(&mut self, message_id: u64) -> SyncMessage {
393        let id = self.tick();
394        SyncMessage {
395            id,
396            sender: self.node_id,
397            payload: SyncPayload::Ack {
398                message_id,
399                applied_clock: self.clock.clone(),
400            },
401            clock: self.clock.clone(),
402            timestamp: current_timestamp_ms(),
403        }
404    }
405
406    /// Create a goodbye message.
407    pub fn create_goodbye(&mut self) -> SyncMessage {
408        let id = self.tick();
409        SyncMessage {
410            id,
411            sender: self.node_id,
412            payload: SyncPayload::Goodbye,
413            clock: self.clock.clone(),
414            timestamp: current_timestamp_ms(),
415        }
416    }
417
418    /// Handle an incoming message from a peer.
419    pub fn handle_message(&mut self, message: &SyncMessage) -> Option<SyncMessage> {
420        // Update peer state
421        if let Some(peer) = self.peers.get_mut(&message.sender) {
422            peer.touch();
423            peer.last_clock = message.clock.clone();
424        }
425
426        // Update our clock
427        self.clock.update(&message.clock);
428
429        match &message.payload {
430            SyncPayload::Hello(info) => {
431                self.register_peer_with_info(info.clone(), message.clock.clone());
432                Some(self.create_hello(None))
433            }
434            SyncPayload::ClockRequest => Some(SyncMessage {
435                id: self.tick(),
436                sender: self.node_id,
437                payload: SyncPayload::ClockResponse(self.clock.clone()),
438                clock: self.clock.clone(),
439                timestamp: current_timestamp_ms(),
440            }),
441            SyncPayload::Heartbeat => {
442                // No response needed, just updated peer state above
443                None
444            }
445            SyncPayload::Ack {
446                message_id,
447                applied_clock: _,
448            } => {
449                if let Some(peer) = self.peers.get_mut(&message.sender) {
450                    peer.receive_ack(*message_id);
451                }
452                None
453            }
454            SyncPayload::Goodbye => {
455                if let Some(peer) = self.peers.get_mut(&message.sender) {
456                    peer.connection_state = PeerConnectionState::Gone;
457                }
458                None
459            }
460            // Other message types need application-level handling
461            _ => None,
462        }
463    }
464
465    /// Get list of stale peers that should be checked.
466    pub fn stale_peers(&self) -> Vec<Uuid> {
467        self.peers
468            .iter()
469            .filter(|(_, state)| state.is_stale(self.config.peer_timeout))
470            .map(|(id, _)| *id)
471            .collect()
472    }
473
474    /// Get peers that need heartbeats.
475    pub fn peers_needing_heartbeat(&self) -> Vec<Uuid> {
476        self.peers
477            .iter()
478            .filter(|(_, state)| {
479                matches!(
480                    state.connection_state,
481                    PeerConnectionState::Synced | PeerConnectionState::Syncing
482                ) && state
483                    .last_seen
484                    .map(|t| t.elapsed() > self.config.heartbeat_interval / 2)
485                    .unwrap_or(true)
486            })
487            .map(|(id, _)| *id)
488            .collect()
489    }
490}
491
492/// Get current timestamp in milliseconds since epoch.
493fn current_timestamp_ms() -> u64 {
494    std::time::SystemTime::now()
495        .duration_since(std::time::UNIX_EPOCH)
496        .map(|d| d.as_millis() as u64)
497        .unwrap_or(0)
498}
499
500#[cfg(test)]
501mod tests {
502    use super::*;
503
504    #[test]
505    fn test_sync_state_creation() {
506        let node_id = Uuid::new_v4();
507        let state = SyncState::new(node_id);
508
509        assert_eq!(state.node_id, node_id);
510        assert!(state.peers.is_empty());
511    }
512
513    #[test]
514    fn test_peer_registration() {
515        let node_id = Uuid::new_v4();
516        let mut state = SyncState::new(node_id);
517
518        let peer_id = Uuid::new_v4();
519        state.register_peer(peer_id, VectorClock::new());
520
521        assert!(state.get_peer(&peer_id).is_some());
522        assert_eq!(state.peers.len(), 1);
523    }
524
525    #[test]
526    fn test_create_hello_message() {
527        let node_id = Uuid::new_v4();
528        let mut state = SyncState::new(node_id);
529
530        let msg = state.create_hello(Some("Test Node".to_string()));
531
532        assert_eq!(msg.sender, node_id);
533        assert!(matches!(msg.payload, SyncPayload::Hello(_)));
534
535        if let SyncPayload::Hello(info) = msg.payload {
536            assert_eq!(info.name, Some("Test Node".to_string()));
537        }
538    }
539
540    #[test]
541    fn test_handle_hello_message() {
542        let node1_id = Uuid::new_v4();
543        let node2_id = Uuid::new_v4();
544
545        let mut state1 = SyncState::new(node1_id);
546        let mut state2 = SyncState::new(node2_id);
547
548        // Node 2 sends hello to Node 1
549        let hello = state2.create_hello(Some("Node 2".to_string()));
550        let response = state1.handle_message(&hello);
551
552        // Node 1 should have registered Node 2
553        assert!(state1.get_peer(&node2_id).is_some());
554
555        // Node 1 should respond with its own hello
556        assert!(response.is_some());
557        if let Some(msg) = response {
558            assert!(matches!(msg.payload, SyncPayload::Hello(_)));
559        }
560    }
561
562    #[test]
563    fn test_clock_updates_on_message() {
564        let node1_id = Uuid::new_v4();
565        let node2_id = Uuid::new_v4();
566
567        let mut state1 = SyncState::new(node1_id);
568        let mut state2 = SyncState::new(node2_id);
569
570        // Node 2 ticks multiple times
571        state2.tick();
572        state2.tick();
573        state2.tick();
574
575        let msg = state2.create_heartbeat();
576        state1.handle_message(&msg);
577
578        // Node 1's clock should have been updated
579        assert!(!state1.clock.happens_before(&state2.clock));
580    }
581
582    #[test]
583    fn test_peer_capabilities() {
584        let caps = PeerCapabilities::default_capabilities();
585
586        assert!(caps.compressed_deltas);
587        assert!(caps.batch_operations);
588        assert_eq!(caps.max_batch_size, 100);
589    }
590
591    #[test]
592    fn test_goodbye_handling() {
593        let node1_id = Uuid::new_v4();
594        let node2_id = Uuid::new_v4();
595
596        let mut state1 = SyncState::new(node1_id);
597        let mut state2 = SyncState::new(node2_id);
598
599        // First register peer
600        state1.register_peer(node2_id, VectorClock::new());
601
602        // Node 2 sends goodbye
603        let goodbye = state2.create_goodbye();
604        state1.handle_message(&goodbye);
605
606        // Peer should be marked as gone
607        let peer = state1.get_peer(&node2_id).unwrap();
608        assert_eq!(peer.connection_state, PeerConnectionState::Gone);
609    }
610
611    #[test]
612    fn test_delta_request_response() {
613        let node_id = Uuid::new_v4();
614        let mut state = SyncState::new(node_id);
615
616        let delta_req = state.create_delta_request(VectorClock::new());
617
618        assert!(matches!(
619            delta_req.payload,
620            SyncPayload::DeltaRequest { .. }
621        ));
622
623        let batch = DeltaBatch::new();
624        let delta_resp = state.create_delta_response(batch, false);
625
626        assert!(matches!(
627            delta_resp.payload,
628            SyncPayload::DeltaResponse {
629                has_more: false,
630                ..
631            }
632        ));
633    }
634
635    #[test]
636    fn test_ack_rtt_tracking() {
637        let node_id = Uuid::new_v4();
638        let mut state = SyncState::new(node_id);
639
640        let peer_id = Uuid::new_v4();
641        state.register_peer(peer_id, VectorClock::new());
642
643        // Simulate sending a message
644        let peer = state.get_peer_mut(&peer_id).unwrap();
645        peer.expect_ack(42);
646
647        // Simulate receiving ack after some time
648        std::thread::sleep(Duration::from_millis(10));
649        peer.receive_ack(42);
650
651        assert!(peer.rtt_estimate.is_some());
652        assert!(peer.pending_acks.is_empty());
653    }
654}