leptos_sync_core/transport/
compatibility_layer.rs1use super::{SyncTransport, TransportError};
7use crate::transport::leptos_ws_pro_transport::{LeptosWsProTransport, LeptosWsProConfig, MessageProtocolAdapter};
8use serde::{Deserialize, Serialize};
9use thiserror::Error;
10
11#[derive(Error, Debug)]
12pub enum CompatibilityError {
13 #[error("Transport error: {0}")]
14 Transport(#[from] TransportError),
15 #[error("Serialization error: {0}")]
16 Serialization(String),
17 #[error("Protocol error: {0}")]
18 Protocol(String),
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(tag = "type")]
24pub enum SyncMessage {
25 #[serde(rename = "sync")]
26 Sync {
27 peer_id: String,
28 data: serde_json::Value,
29 timestamp: String,
30 },
31 #[serde(rename = "presence")]
32 Presence {
33 peer_id: String,
34 action: String,
35 timestamp: String,
36 },
37 #[serde(rename = "heartbeat")]
38 Heartbeat {
39 timestamp: String,
40 },
41 #[serde(rename = "welcome")]
42 Welcome {
43 peer_id: String,
44 timestamp: String,
45 server_info: ServerInfo,
46 },
47 #[serde(rename = "binary_ack")]
48 BinaryAck {
49 peer_id: String,
50 size: usize,
51 timestamp: String,
52 },
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct ServerInfo {
57 pub version: String,
58 pub max_connections: usize,
59 pub heartbeat_interval: u64,
60}
61
62pub struct CompatibilityTransport {
65 leptos_ws_pro: LeptosWsProTransport,
66 adapter: MessageProtocolAdapter,
67 message_buffer: std::collections::VecDeque<Vec<u8>>,
68}
69
70impl CompatibilityTransport {
71 pub fn new(config: LeptosWsProConfig) -> Self {
73 let leptos_ws_pro = LeptosWsProTransport::new(config);
74 let adapter = MessageProtocolAdapter::new(leptos_ws_pro.clone());
75
76 Self {
77 leptos_ws_pro,
78 adapter,
79 message_buffer: std::collections::VecDeque::new(),
80 }
81 }
82
83 pub fn with_url(url: String) -> Self {
85 let config = LeptosWsProConfig {
86 url,
87 ..Default::default()
88 };
89 Self::new(config)
90 }
91
92 pub async fn connect(&self) -> Result<(), CompatibilityError> {
94 self.leptos_ws_pro.connect().await
95 .map_err(|e| CompatibilityError::Transport(e.into()))
96 }
97
98 pub async fn disconnect(&self) -> Result<(), CompatibilityError> {
100 self.leptos_ws_pro.disconnect().await
101 .map_err(|e| CompatibilityError::Transport(e.into()))
102 }
103
104 pub async fn send_sync(&self, peer_id: &str, data: serde_json::Value) -> Result<(), CompatibilityError> {
106 self.adapter.send_sync_message(peer_id, data).await
107 .map_err(|e| CompatibilityError::Transport(e.into()))
108 }
109
110 pub async fn send_presence(&self, peer_id: &str, action: &str) -> Result<(), CompatibilityError> {
112 self.adapter.send_presence_message(peer_id, action).await
113 .map_err(|e| CompatibilityError::Transport(e.into()))
114 }
115
116 pub async fn send_heartbeat(&self) -> Result<(), CompatibilityError> {
118 self.adapter.send_heartbeat().await
119 .map_err(|e| CompatibilityError::Transport(e.into()))
120 }
121
122 pub async fn receive_messages(&self) -> Result<Vec<SyncMessage>, CompatibilityError> {
124 let raw_messages = self.adapter.receive_messages().await
125 .map_err(|e| CompatibilityError::Transport(e.into()))?;
126
127 let mut parsed_messages = Vec::new();
128 for raw_message in raw_messages {
129 match serde_json::from_value::<SyncMessage>(raw_message) {
130 Ok(parsed) => parsed_messages.push(parsed),
131 Err(e) => {
132 tracing::warn!("Failed to parse sync message: {}", e);
133 continue;
134 }
135 }
136 }
137
138 Ok(parsed_messages)
139 }
140
141 pub fn leptos_ws_pro_transport(&self) -> &LeptosWsProTransport {
143 &self.leptos_ws_pro
144 }
145
146 pub fn message_adapter(&self) -> &MessageProtocolAdapter {
148 &self.adapter
149 }
150}
151
152impl SyncTransport for CompatibilityTransport {
153 type Error = TransportError;
154
155 fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>> {
156 Box::pin(async move {
157 if let Ok(message) = serde_json::from_slice::<serde_json::Value>(data) {
159 if let Some(msg_type) = message.get("type").and_then(|t| t.as_str()) {
160 match msg_type {
161 "sync" => {
162 if let (Some(peer_id), Some(data)) = (
163 message.get("peer_id").and_then(|p| p.as_str()),
164 message.get("data")
165 ) {
166 return self.send_sync(peer_id, data.clone()).await
167 .map_err(|e| match e {
168 CompatibilityError::Transport(t) => t,
169 _ => TransportError::SendFailed(e.to_string()),
170 });
171 }
172 }
173 "presence" => {
174 if let (Some(peer_id), Some(action)) = (
175 message.get("peer_id").and_then(|p| p.as_str()),
176 message.get("action").and_then(|a| a.as_str())
177 ) {
178 return self.send_presence(peer_id, action).await
179 .map_err(|e| match e {
180 CompatibilityError::Transport(t) => t,
181 _ => TransportError::SendFailed(e.to_string()),
182 });
183 }
184 }
185 "heartbeat" => {
186 return self.send_heartbeat().await
187 .map_err(|e| match e {
188 CompatibilityError::Transport(t) => t,
189 _ => TransportError::SendFailed(e.to_string()),
190 });
191 }
192 _ => {
193 }
195 }
196 }
197 }
198
199 self.leptos_ws_pro.send(data).await
201 })
202 }
203
204 fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>> {
205 Box::pin(async move {
206 let messages = self.receive_messages().await
208 .map_err(|e| match e {
209 CompatibilityError::Transport(t) => t,
210 _ => TransportError::ReceiveFailed(e.to_string()),
211 })?;
212
213 let mut raw_messages = Vec::new();
214 for message in messages {
215 match serde_json::to_vec(&message) {
216 Ok(raw) => raw_messages.push(raw),
217 Err(e) => {
218 tracing::warn!("Failed to serialize message: {}", e);
219 continue;
220 }
221 }
222 }
223
224 Ok(raw_messages)
225 })
226 }
227
228 fn is_connected(&self) -> bool {
229 self.leptos_ws_pro.is_connected()
230 }
231}
232
233impl Clone for CompatibilityTransport {
234 fn clone(&self) -> Self {
235 Self {
236 leptos_ws_pro: self.leptos_ws_pro.clone(),
237 adapter: MessageProtocolAdapter::new(self.leptos_ws_pro.clone()),
238 message_buffer: std::collections::VecDeque::new(),
239 }
240 }
241}
242
243pub struct MigrationHelper {
248 new_transport: CompatibilityTransport,
249 migration_complete: bool,
250}
251
252impl MigrationHelper {
253 pub fn new(config: LeptosWsProConfig) -> Self {
255 Self {
256 new_transport: CompatibilityTransport::new(config),
257 migration_complete: false,
258 }
259 }
260
261 pub async fn migrate(&mut self) -> Result<(), CompatibilityError> {
263 self.new_transport.connect().await?;
265
266 self.migration_complete = true;
268
269 Ok(())
270 }
271
272 pub fn is_migration_complete(&self) -> bool {
274 self.migration_complete
275 }
276
277 pub fn new_transport(&self) -> &CompatibilityTransport {
279 &self.new_transport
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use crate::transport::InMemoryTransport;
287
288 #[tokio::test]
289 async fn test_compatibility_transport_creation() {
290 let config = LeptosWsProConfig::default();
291 let transport = CompatibilityTransport::new(config);
292
293 assert!(!transport.is_connected());
294 }
295
296 #[tokio::test]
297 async fn test_sync_message_parsing() {
298 let config = LeptosWsProConfig::default();
299 let transport = CompatibilityTransport::new(config);
300
301 let sync_data = serde_json::json!({
303 "changes": ["change1", "change2"],
304 "client_id": "test_client"
305 });
306
307 let result = transport.send_sync("test_peer", sync_data).await;
308 assert!(result.is_err()); }
310
311 #[tokio::test]
312 async fn test_presence_message_parsing() {
313 let config = LeptosWsProConfig::default();
314 let transport = CompatibilityTransport::new(config);
315
316 let result = transport.send_presence("test_peer", "connected").await;
317 assert!(result.is_err()); }
319
320 #[tokio::test]
321 async fn test_heartbeat_message_parsing() {
322 let config = LeptosWsProConfig::default();
323 let transport = CompatibilityTransport::new(config);
324
325 let result = transport.send_heartbeat().await;
326 assert!(result.is_err()); }
328
329 #[tokio::test]
330 async fn test_migration_helper() {
331 let config = LeptosWsProConfig {
332 url: "ws://invalid-url-that-does-not-exist:9999".to_string(),
333 ..Default::default()
334 };
335 let mut helper = MigrationHelper::new(config);
336
337 assert!(!helper.is_migration_complete());
338
339 let result = helper.migrate().await;
341 assert!(result.is_err()); }
343
344 #[tokio::test]
345 async fn test_sync_transport_trait_compliance() {
346 let config = LeptosWsProConfig::default();
347 let transport = CompatibilityTransport::new(config);
348
349 assert!(!transport.is_connected());
351
352 let data = b"trait compliance test";
354 let send_result = transport.send(data).await;
355 assert!(send_result.is_err()); let receive_result = transport.receive().await;
358 assert!(receive_result.is_ok());
359 }
360}