leptos_sync_core/transport/
message_protocol.rs1use crate::crdt::ReplicaId;
4use serde::{Deserialize, Serialize};
5use std::time::SystemTime;
6
7#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
9pub enum CrdtType {
10 LwwRegister,
11 LwwMap,
12 GCounter,
13 RGA,
14 LSEQ,
15 YjsTree,
16 DAG,
17 Graph,
18 Tree,
19}
20
21#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
23pub struct UserInfo {
24 pub user_id: String,
25 pub username: Option<String>,
26 pub display_name: Option<String>,
27 pub avatar_url: Option<String>,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub enum SyncMessage {
33 Delta {
35 collection_id: String,
36 crdt_type: CrdtType,
37 delta: Vec<u8>,
38 timestamp: SystemTime,
39 replica_id: ReplicaId,
40 },
41 Heartbeat {
43 replica_id: ReplicaId,
44 timestamp: SystemTime,
45 },
46 PeerJoin {
48 replica_id: ReplicaId,
49 user_info: Option<UserInfo>,
50 },
51 PeerLeave { replica_id: ReplicaId },
53 Welcome {
55 peer_id: ReplicaId,
56 timestamp: SystemTime,
57 server_info: Option<ServerInfo>,
58 },
59 Presence {
61 peer_id: ReplicaId,
62 action: PresenceAction,
63 timestamp: SystemTime,
64 },
65 BinaryAck {
67 peer_id: ReplicaId,
68 size: usize,
69 timestamp: SystemTime,
70 },
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct ServerInfo {
76 pub version: String,
77 pub max_connections: Option<usize>,
78 pub features: Vec<String>,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
83pub enum PresenceAction {
84 Join,
85 Leave,
86 Update,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct MessageWrapper {
92 pub version: u32,
93 pub message: SyncMessage,
94 pub message_id: Option<String>,
95}
96
97impl MessageWrapper {
98 pub const PROTOCOL_VERSION: u32 = 1;
99
100 pub fn new(message: SyncMessage) -> Self {
101 Self {
102 version: Self::PROTOCOL_VERSION,
103 message,
104 message_id: None,
105 }
106 }
107
108 pub fn with_id(message: SyncMessage, message_id: String) -> Self {
109 Self {
110 version: Self::PROTOCOL_VERSION,
111 message,
112 message_id: Some(message_id),
113 }
114 }
115}
116
117pub struct MessageCodec;
119
120impl MessageCodec {
121 pub fn serialize(message: &SyncMessage) -> Result<Vec<u8>, serde_json::Error> {
123 let wrapper = MessageWrapper::new(message.clone());
124 serde_json::to_vec(&wrapper)
125 }
126
127 pub fn deserialize(data: &[u8]) -> Result<SyncMessage, serde_json::Error> {
129 let wrapper: MessageWrapper = serde_json::from_slice(data)?;
130
131 if wrapper.version > MessageWrapper::PROTOCOL_VERSION {
133 return Err(serde_json::Error::io(std::io::Error::new(
134 std::io::ErrorKind::InvalidData,
135 format!(
136 "Unsupported protocol version: {} (supported: {})",
137 wrapper.version,
138 MessageWrapper::PROTOCOL_VERSION
139 ),
140 )));
141 }
142
143 Ok(wrapper.message)
144 }
145
146 pub fn serialize_compressed(
148 message: &SyncMessage,
149 ) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
150 let json_data = Self::serialize(message)?;
151
152 Ok(json_data)
155 }
156
157 pub fn deserialize_compressed(data: &[u8]) -> Result<SyncMessage, Box<dyn std::error::Error>> {
159 Self::deserialize(data).map_err(|e| e.into())
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168 use crate::crdt::ReplicaId;
169 use std::time::UNIX_EPOCH;
170
171 fn create_test_replica_id() -> ReplicaId {
172 ReplicaId::from(uuid::Uuid::new_v4())
173 }
174
175 #[test]
176 fn test_message_serialization() {
177 let replica_id = create_test_replica_id();
178 let message = SyncMessage::Heartbeat {
179 replica_id: replica_id.clone(),
180 timestamp: UNIX_EPOCH,
181 };
182
183 let serialized = MessageCodec::serialize(&message).unwrap();
184 let deserialized = MessageCodec::deserialize(&serialized).unwrap();
185
186 match (message, deserialized) {
187 (
188 SyncMessage::Heartbeat {
189 replica_id: id1,
190 timestamp: t1,
191 },
192 SyncMessage::Heartbeat {
193 replica_id: id2,
194 timestamp: t2,
195 },
196 ) => {
197 assert_eq!(id1, id2);
198 assert_eq!(t1, t2);
199 }
200 _ => panic!("Message types don't match"),
201 }
202 }
203
204 #[test]
205 fn test_delta_message() {
206 let replica_id = create_test_replica_id();
207 let delta_data = b"test delta data".to_vec();
208
209 let message = SyncMessage::Delta {
210 collection_id: "test_collection".to_string(),
211 crdt_type: CrdtType::LwwRegister,
212 delta: delta_data.clone(),
213 timestamp: UNIX_EPOCH,
214 replica_id: replica_id.clone(),
215 };
216
217 let serialized = MessageCodec::serialize(&message).unwrap();
218 let deserialized = MessageCodec::deserialize(&serialized).unwrap();
219
220 match deserialized {
221 SyncMessage::Delta {
222 collection_id,
223 crdt_type,
224 delta,
225 timestamp,
226 replica_id: id,
227 } => {
228 assert_eq!(collection_id, "test_collection");
229 assert_eq!(crdt_type, CrdtType::LwwRegister);
230 assert_eq!(delta, delta_data);
231 assert_eq!(timestamp, UNIX_EPOCH);
232 assert_eq!(id, replica_id);
233 }
234 _ => panic!("Expected Delta message"),
235 }
236 }
237
238 #[test]
239 fn test_peer_join_message() {
240 let replica_id = create_test_replica_id();
241 let user_info = UserInfo {
242 user_id: "user123".to_string(),
243 username: Some("testuser".to_string()),
244 display_name: Some("Test User".to_string()),
245 avatar_url: Some("https://example.com/avatar.png".to_string()),
246 };
247
248 let message = SyncMessage::PeerJoin {
249 replica_id: replica_id.clone(),
250 user_info: Some(user_info.clone()),
251 };
252
253 let serialized = MessageCodec::serialize(&message).unwrap();
254 let deserialized = MessageCodec::deserialize(&serialized).unwrap();
255
256 match deserialized {
257 SyncMessage::PeerJoin {
258 replica_id: id,
259 user_info: info,
260 } => {
261 assert_eq!(id, replica_id);
262 assert_eq!(info, Some(user_info));
263 }
264 _ => panic!("Expected PeerJoin message"),
265 }
266 }
267
268 #[test]
269 fn test_message_wrapper() {
270 let replica_id = create_test_replica_id();
271 let message = SyncMessage::Heartbeat {
272 replica_id,
273 timestamp: UNIX_EPOCH,
274 };
275
276 let wrapper = MessageWrapper::new(message.clone());
277 assert_eq!(wrapper.version, MessageWrapper::PROTOCOL_VERSION);
278 assert_eq!(wrapper.message_id, None);
279
280 let wrapper_with_id = MessageWrapper::with_id(message, "msg123".to_string());
281 assert_eq!(wrapper_with_id.version, MessageWrapper::PROTOCOL_VERSION);
282 assert_eq!(wrapper_with_id.message_id, Some("msg123".to_string()));
283 }
284
285 #[test]
286 fn test_compressed_serialization() {
287 let replica_id = create_test_replica_id();
288 let message = SyncMessage::Heartbeat {
289 replica_id,
290 timestamp: UNIX_EPOCH,
291 };
292
293 let compressed = MessageCodec::serialize_compressed(&message).unwrap();
295 let decompressed = MessageCodec::deserialize_compressed(&compressed).unwrap();
296
297 match (message, decompressed) {
298 (
299 SyncMessage::Heartbeat {
300 replica_id: id1,
301 timestamp: t1,
302 },
303 SyncMessage::Heartbeat {
304 replica_id: id2,
305 timestamp: t2,
306 },
307 ) => {
308 assert_eq!(id1, id2);
309 assert_eq!(t1, t2);
310 }
311 _ => panic!("Message types don't match"),
312 }
313 }
314}