leptos_sync_core/transport/
message_protocol.rs

1//! WebSocket message protocol for CRDT synchronization
2
3use crate::crdt::ReplicaId;
4use serde::{Deserialize, Serialize};
5use std::time::SystemTime;
6
7/// CRDT types that can be synchronized
8#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
9pub enum CrdtType {
10    LwwRegister,
11    LwwMap,
12    GCounter,
13    RGA,
14    LSEQ,
15    YjsTree,
16    DAG,
17    Graph,
18    Tree,
19}
20
21/// User information for peer identification
22#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
23pub struct UserInfo {
24    pub user_id: String,
25    pub username: Option<String>,
26    pub display_name: Option<String>,
27    pub avatar_url: Option<String>,
28}
29
30/// WebSocket synchronization message protocol
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub enum SyncMessage {
33    /// CRDT delta message
34    Delta {
35        collection_id: String,
36        crdt_type: CrdtType,
37        delta: Vec<u8>,
38        timestamp: SystemTime,
39        replica_id: ReplicaId,
40    },
41    /// Heartbeat message to maintain connection
42    Heartbeat {
43        replica_id: ReplicaId,
44        timestamp: SystemTime,
45    },
46    /// Peer joining the session
47    PeerJoin {
48        replica_id: ReplicaId,
49        user_info: Option<UserInfo>,
50    },
51    /// Peer leaving the session
52    PeerLeave { replica_id: ReplicaId },
53    /// Welcome message from server
54    Welcome {
55        peer_id: ReplicaId,
56        timestamp: SystemTime,
57        server_info: Option<ServerInfo>,
58    },
59    /// Presence update
60    Presence {
61        peer_id: ReplicaId,
62        action: PresenceAction,
63        timestamp: SystemTime,
64    },
65    /// Binary data acknowledgment
66    BinaryAck {
67        peer_id: ReplicaId,
68        size: usize,
69        timestamp: SystemTime,
70    },
71}
72
73/// Server information
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct ServerInfo {
76    pub version: String,
77    pub max_connections: Option<usize>,
78    pub features: Vec<String>,
79}
80
81/// Presence action types
82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
83pub enum PresenceAction {
84    Join,
85    Leave,
86    Update,
87}
88
89/// Message wrapper with protocol versioning
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct MessageWrapper {
92    pub version: u32,
93    pub message: SyncMessage,
94    pub message_id: Option<String>,
95}
96
97impl MessageWrapper {
98    pub const PROTOCOL_VERSION: u32 = 1;
99
100    pub fn new(message: SyncMessage) -> Self {
101        Self {
102            version: Self::PROTOCOL_VERSION,
103            message,
104            message_id: None,
105        }
106    }
107
108    pub fn with_id(message: SyncMessage, message_id: String) -> Self {
109        Self {
110            version: Self::PROTOCOL_VERSION,
111            message,
112            message_id: Some(message_id),
113        }
114    }
115}
116
117/// Message serialization/deserialization utilities
118pub struct MessageCodec;
119
120impl MessageCodec {
121    /// Serialize a message to JSON bytes
122    pub fn serialize(message: &SyncMessage) -> Result<Vec<u8>, serde_json::Error> {
123        let wrapper = MessageWrapper::new(message.clone());
124        serde_json::to_vec(&wrapper)
125    }
126
127    /// Deserialize JSON bytes to a message
128    pub fn deserialize(data: &[u8]) -> Result<SyncMessage, serde_json::Error> {
129        let wrapper: MessageWrapper = serde_json::from_slice(data)?;
130
131        // Check protocol version compatibility
132        if wrapper.version > MessageWrapper::PROTOCOL_VERSION {
133            return Err(serde_json::Error::io(std::io::Error::new(
134                std::io::ErrorKind::InvalidData,
135                format!(
136                    "Unsupported protocol version: {} (supported: {})",
137                    wrapper.version,
138                    MessageWrapper::PROTOCOL_VERSION
139                ),
140            )));
141        }
142
143        Ok(wrapper.message)
144    }
145
146    /// Serialize a message with compression (if enabled)
147    pub fn serialize_compressed(
148        message: &SyncMessage,
149    ) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
150        let json_data = Self::serialize(message)?;
151
152        // For now, just return JSON data
153        // TODO: Add compression support when compression feature is enabled
154        Ok(json_data)
155    }
156
157    /// Deserialize compressed message data
158    pub fn deserialize_compressed(data: &[u8]) -> Result<SyncMessage, Box<dyn std::error::Error>> {
159        // For now, just deserialize as JSON
160        // TODO: Add decompression support when compression feature is enabled
161        Self::deserialize(data).map_err(|e| e.into())
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use crate::crdt::ReplicaId;
169    use std::time::UNIX_EPOCH;
170
171    fn create_test_replica_id() -> ReplicaId {
172        ReplicaId::from(uuid::Uuid::new_v4())
173    }
174
175    #[test]
176    fn test_message_serialization() {
177        let replica_id = create_test_replica_id();
178        let message = SyncMessage::Heartbeat {
179            replica_id: replica_id.clone(),
180            timestamp: UNIX_EPOCH,
181        };
182
183        let serialized = MessageCodec::serialize(&message).unwrap();
184        let deserialized = MessageCodec::deserialize(&serialized).unwrap();
185
186        match (message, deserialized) {
187            (
188                SyncMessage::Heartbeat {
189                    replica_id: id1,
190                    timestamp: t1,
191                },
192                SyncMessage::Heartbeat {
193                    replica_id: id2,
194                    timestamp: t2,
195                },
196            ) => {
197                assert_eq!(id1, id2);
198                assert_eq!(t1, t2);
199            }
200            _ => panic!("Message types don't match"),
201        }
202    }
203
204    #[test]
205    fn test_delta_message() {
206        let replica_id = create_test_replica_id();
207        let delta_data = b"test delta data".to_vec();
208
209        let message = SyncMessage::Delta {
210            collection_id: "test_collection".to_string(),
211            crdt_type: CrdtType::LwwRegister,
212            delta: delta_data.clone(),
213            timestamp: UNIX_EPOCH,
214            replica_id: replica_id.clone(),
215        };
216
217        let serialized = MessageCodec::serialize(&message).unwrap();
218        let deserialized = MessageCodec::deserialize(&serialized).unwrap();
219
220        match deserialized {
221            SyncMessage::Delta {
222                collection_id,
223                crdt_type,
224                delta,
225                timestamp,
226                replica_id: id,
227            } => {
228                assert_eq!(collection_id, "test_collection");
229                assert_eq!(crdt_type, CrdtType::LwwRegister);
230                assert_eq!(delta, delta_data);
231                assert_eq!(timestamp, UNIX_EPOCH);
232                assert_eq!(id, replica_id);
233            }
234            _ => panic!("Expected Delta message"),
235        }
236    }
237
238    #[test]
239    fn test_peer_join_message() {
240        let replica_id = create_test_replica_id();
241        let user_info = UserInfo {
242            user_id: "user123".to_string(),
243            username: Some("testuser".to_string()),
244            display_name: Some("Test User".to_string()),
245            avatar_url: Some("https://example.com/avatar.png".to_string()),
246        };
247
248        let message = SyncMessage::PeerJoin {
249            replica_id: replica_id.clone(),
250            user_info: Some(user_info.clone()),
251        };
252
253        let serialized = MessageCodec::serialize(&message).unwrap();
254        let deserialized = MessageCodec::deserialize(&serialized).unwrap();
255
256        match deserialized {
257            SyncMessage::PeerJoin {
258                replica_id: id,
259                user_info: info,
260            } => {
261                assert_eq!(id, replica_id);
262                assert_eq!(info, Some(user_info));
263            }
264            _ => panic!("Expected PeerJoin message"),
265        }
266    }
267
268    #[test]
269    fn test_message_wrapper() {
270        let replica_id = create_test_replica_id();
271        let message = SyncMessage::Heartbeat {
272            replica_id,
273            timestamp: UNIX_EPOCH,
274        };
275
276        let wrapper = MessageWrapper::new(message.clone());
277        assert_eq!(wrapper.version, MessageWrapper::PROTOCOL_VERSION);
278        assert_eq!(wrapper.message_id, None);
279
280        let wrapper_with_id = MessageWrapper::with_id(message, "msg123".to_string());
281        assert_eq!(wrapper_with_id.version, MessageWrapper::PROTOCOL_VERSION);
282        assert_eq!(wrapper_with_id.message_id, Some("msg123".to_string()));
283    }
284
285    #[test]
286    fn test_compressed_serialization() {
287        let replica_id = create_test_replica_id();
288        let message = SyncMessage::Heartbeat {
289            replica_id,
290            timestamp: UNIX_EPOCH,
291        };
292
293        // Test that compressed serialization works (currently just JSON)
294        let compressed = MessageCodec::serialize_compressed(&message).unwrap();
295        let decompressed = MessageCodec::deserialize_compressed(&compressed).unwrap();
296
297        match (message, decompressed) {
298            (
299                SyncMessage::Heartbeat {
300                    replica_id: id1,
301                    timestamp: t1,
302                },
303                SyncMessage::Heartbeat {
304                    replica_id: id2,
305                    timestamp: t2,
306                },
307            ) => {
308                assert_eq!(id1, id2);
309                assert_eq!(t1, t2);
310            }
311            _ => panic!("Message types don't match"),
312        }
313    }
314}